Parallel.Invoke 任务繁忙时等待
Parallel.Invoke wait if tasks are busy
有人可以帮我写下面的代码吗?在行中:
Parallel.Invoke(parallelOptions, () => dosomething(message));
我想调用最多 5 个并行任务(如果有 5 个忙,等待下一个可用,然后启动它...如果只有 4 个忙,启动第 5 个,等等)
private AutoResetEvent autoResetEvent1 = new AutoResetEvent(false);
private ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 5 };
private void threadProc()
{
queue.ReceiveCompleted += MyReceiveCompleted1;
Debug.WriteLine("about to start loop");
while (!shutdown)
{
queue.BeginReceive();
autoResetEvent1.WaitOne();
}
queue.ReceiveCompleted -= MyReceiveCompleted1;
queue.Dispose();
Debug.WriteLine("we are done");
}
private void MyReceiveCompleted1(object sender, ReceiveCompletedEventArgs e)
{
var message = queue.EndReceive(e.AsyncResult);
Debug.WriteLine("number of max tasks: " + parallelOptions.MaxDegreeOfParallelism);
Parallel.Invoke(parallelOptions, () => dosomething(message));
autoResetEvent1.Set();
}
private void dosomething(Message message)
{
//dummy body
var i = 0;
while (true)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
i++;
if (i == 5 || i == 10 || i == 15) Debug.WriteLine("loop number: " + i + " on thread: " + Thread.CurrentThread.ManagedThreadId);
if (i == 15)
break;
}
Debug.WriteLine("finished task");
}
我现在得到的结果:
1) dosomething() 正如你在上面看到的那样,我一次只得到一个(等待)
2) dosomething() 更改为以下,我得到 none 停止 x 任务数(不受限制或遵守 MaxDegreeOfParallelism
private async Task dosomething(Message message)
{
//dummy body
var i = 0;
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(1));
i++;
if (i == 5 || i == 10 || i == 15) Debug.WriteLine("loop number: " + i + " on thread: " + Thread.CurrentThread.ManagedThreadId);
if (i == 15)
break;
}
Debug.WriteLine("finished task");
}
为了实现我想要的目标,我做错了什么?
我想要的结果:
在"MyReceiveCompleted"中,我想确保只有5个并发任务在处理消息,如果有5个忙,等待一个可用。
这行代码:
Parallel.Invoke(parallelOptions, () => dosomething(message));
告诉 TPL 开始新的并行操作,只有一件事要做。所以,5 的 "max parallelism" 选项有点没有意义,因为只有一件事要做。 autoResetEvent1
确保一次只有一个并行操作,并且每个并行操作只有一件事要做,所以观察到的一次只有一件事 运行 的行为是完全可以预期的。
当您将委托更改为异步时,实际发生的是 MaxDegreeOfParallelism
仅应用于方法的 同步 部分。因此,一旦它达到第一个 await
,它就会 "leaves" 并行操作,不再受其约束。
核心问题是 Parallel
在操作数量提前已知时效果最佳——您的代码不知道;它只是从队列中读取它们并在它们到达时进行处理。正如有人评论的那样,您 可以 使用动态任务并行解决此问题,并使用 TaskScheduler
限制并发。
但是,最简单的解决方案可能是 TPL Dataflow。您可以使用适当的限制选项创建一个 ActionBlock<T>
并在消息到达时向其发送消息:
private ActionBlock<string> block = new ActionBlock<string>(
message => dosomething(message),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
private void MyReceiveCompleted1(object sender, ReceiveCompletedEventArgs e)
{
var message = queue.EndReceive(e.AsyncResult);
block.Post(message);
autoResetEvent1.Set();
}
TPL 数据流的一个优点是它还理解异步方法,并将 MaxDegreeOfParallelism
解释为最大并发度。
有人可以帮我写下面的代码吗?在行中:
Parallel.Invoke(parallelOptions, () => dosomething(message));
我想调用最多 5 个并行任务(如果有 5 个忙,等待下一个可用,然后启动它...如果只有 4 个忙,启动第 5 个,等等)
private AutoResetEvent autoResetEvent1 = new AutoResetEvent(false);
private ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 5 };
private void threadProc()
{
queue.ReceiveCompleted += MyReceiveCompleted1;
Debug.WriteLine("about to start loop");
while (!shutdown)
{
queue.BeginReceive();
autoResetEvent1.WaitOne();
}
queue.ReceiveCompleted -= MyReceiveCompleted1;
queue.Dispose();
Debug.WriteLine("we are done");
}
private void MyReceiveCompleted1(object sender, ReceiveCompletedEventArgs e)
{
var message = queue.EndReceive(e.AsyncResult);
Debug.WriteLine("number of max tasks: " + parallelOptions.MaxDegreeOfParallelism);
Parallel.Invoke(parallelOptions, () => dosomething(message));
autoResetEvent1.Set();
}
private void dosomething(Message message)
{
//dummy body
var i = 0;
while (true)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
i++;
if (i == 5 || i == 10 || i == 15) Debug.WriteLine("loop number: " + i + " on thread: " + Thread.CurrentThread.ManagedThreadId);
if (i == 15)
break;
}
Debug.WriteLine("finished task");
}
我现在得到的结果:
1) dosomething() 正如你在上面看到的那样,我一次只得到一个(等待)
2) dosomething() 更改为以下,我得到 none 停止 x 任务数(不受限制或遵守 MaxDegreeOfParallelism
private async Task dosomething(Message message)
{
//dummy body
var i = 0;
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(1));
i++;
if (i == 5 || i == 10 || i == 15) Debug.WriteLine("loop number: " + i + " on thread: " + Thread.CurrentThread.ManagedThreadId);
if (i == 15)
break;
}
Debug.WriteLine("finished task");
}
为了实现我想要的目标,我做错了什么?
我想要的结果:
在"MyReceiveCompleted"中,我想确保只有5个并发任务在处理消息,如果有5个忙,等待一个可用。
这行代码:
Parallel.Invoke(parallelOptions, () => dosomething(message));
告诉 TPL 开始新的并行操作,只有一件事要做。所以,5 的 "max parallelism" 选项有点没有意义,因为只有一件事要做。 autoResetEvent1
确保一次只有一个并行操作,并且每个并行操作只有一件事要做,所以观察到的一次只有一件事 运行 的行为是完全可以预期的。
当您将委托更改为异步时,实际发生的是 MaxDegreeOfParallelism
仅应用于方法的 同步 部分。因此,一旦它达到第一个 await
,它就会 "leaves" 并行操作,不再受其约束。
核心问题是 Parallel
在操作数量提前已知时效果最佳——您的代码不知道;它只是从队列中读取它们并在它们到达时进行处理。正如有人评论的那样,您 可以 使用动态任务并行解决此问题,并使用 TaskScheduler
限制并发。
但是,最简单的解决方案可能是 TPL Dataflow。您可以使用适当的限制选项创建一个 ActionBlock<T>
并在消息到达时向其发送消息:
private ActionBlock<string> block = new ActionBlock<string>(
message => dosomething(message),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
private void MyReceiveCompleted1(object sender, ReceiveCompletedEventArgs e)
{
var message = queue.EndReceive(e.AsyncResult);
block.Post(message);
autoResetEvent1.Set();
}
TPL 数据流的一个优点是它还理解异步方法,并将 MaxDegreeOfParallelism
解释为最大并发度。