在方法返回任务上使用 Parallel.ForEach - 避免 Task.WaitAll
Use Parallel.ForEach on method returning task - avoid Task.WaitAll
我有一个方法需要 IWorkItem
,开始处理它和 returns 相关任务。由于使用了外部库,该方法必须如下所示。
public Task WorkOn(IWorkItem workItem)
{
//...start asynchronous operation, return task
}
我想在多个工作项上完成这项工作。我不知道他们中会有多少人——也许 1 人,也许 10 000 人。
WorkOn
方法有内部池,如果达到太多并行执行,可能需要等待。 (如 SemaphoreSlim.Wait
):
public Task WorkOn(IWorkItem workItem)
{
_semaphoreSlim.Wait();
}
我目前的解决方案是:
public void Do(params IWorkItem[] workItems)
{
var tasks = new Task[workItems.Length];
for (var i = 0; i < workItems.Length; i++)
{
tasks[i] = WorkOn(workItems[i]);
}
Task.WaitAll(tasks);
}
问题:在这种情况下我可以使用某种方式 Parallel.ForEach 吗?为避免创建 10000 个任务后因 WorkOn
的节流而等待?
其实没那么容易。您可以使用 Parallel.ForEach
来限制生成的任务数量。但我不确定 perform/behave 你的情况如何。
作为一般经验法则,我通常尽量避免混合使用 Task
和 Parallel
。
当然你可以这样做:
public void Do(params IWorkItem[] workItems)
{
Parallel.ForEach(workItems, (workItem) => WorkOn(workItem).Wait());
}
在 "normal" 条件下,这应该可以很好地限制您的并发性。
您也可以完全 async
-await
并使用一些技巧为您的并发添加一些限制。但是在那种情况下你必须自己做并发限制。
const int ConcurrencyLimit = 8;
public async Task Do(params IWorkItem[] workItems)
{
var cursor = 0;
var currentlyProcessing = new List<Task>(ConcurrencyLimit);
while (cursor < workItems.Length)
{
while (currentlyProcessing.Count < ConcurrencyLimit && cursor < workItems.Length)
{
currentlyProcessing.Add(WorkOn(workItems[cursor]));
cursor++;
}
Task finished = await Task.WhenAny(currentlyProcessing);
currentlyProcessing.Remove(finished);
}
await Task.WhenAll(currentlyProcessing);
}
就像我说的……要复杂得多。但它也会将并发限制为您应用的任何值。此外,它还正确地使用了 async
-await
模式。如果您不想要非阻塞多线程,您可以轻松地将此函数包装到另一个函数中,并对由此函数 return 编辑的任务执行阻塞 .Wait
。
此实现的关键是 Task.WhenAny
函数。此函数将 return 已完成的任务应用到任务列表中(由 await
.
的另一任务包装)
我有一个方法需要 IWorkItem
,开始处理它和 returns 相关任务。由于使用了外部库,该方法必须如下所示。
public Task WorkOn(IWorkItem workItem)
{
//...start asynchronous operation, return task
}
我想在多个工作项上完成这项工作。我不知道他们中会有多少人——也许 1 人,也许 10 000 人。
WorkOn
方法有内部池,如果达到太多并行执行,可能需要等待。 (如 SemaphoreSlim.Wait
):
public Task WorkOn(IWorkItem workItem)
{
_semaphoreSlim.Wait();
}
我目前的解决方案是:
public void Do(params IWorkItem[] workItems)
{
var tasks = new Task[workItems.Length];
for (var i = 0; i < workItems.Length; i++)
{
tasks[i] = WorkOn(workItems[i]);
}
Task.WaitAll(tasks);
}
问题:在这种情况下我可以使用某种方式 Parallel.ForEach 吗?为避免创建 10000 个任务后因 WorkOn
的节流而等待?
其实没那么容易。您可以使用 Parallel.ForEach
来限制生成的任务数量。但我不确定 perform/behave 你的情况如何。
作为一般经验法则,我通常尽量避免混合使用 Task
和 Parallel
。
当然你可以这样做:
public void Do(params IWorkItem[] workItems)
{
Parallel.ForEach(workItems, (workItem) => WorkOn(workItem).Wait());
}
在 "normal" 条件下,这应该可以很好地限制您的并发性。
您也可以完全 async
-await
并使用一些技巧为您的并发添加一些限制。但是在那种情况下你必须自己做并发限制。
const int ConcurrencyLimit = 8;
public async Task Do(params IWorkItem[] workItems)
{
var cursor = 0;
var currentlyProcessing = new List<Task>(ConcurrencyLimit);
while (cursor < workItems.Length)
{
while (currentlyProcessing.Count < ConcurrencyLimit && cursor < workItems.Length)
{
currentlyProcessing.Add(WorkOn(workItems[cursor]));
cursor++;
}
Task finished = await Task.WhenAny(currentlyProcessing);
currentlyProcessing.Remove(finished);
}
await Task.WhenAll(currentlyProcessing);
}
就像我说的……要复杂得多。但它也会将并发限制为您应用的任何值。此外,它还正确地使用了 async
-await
模式。如果您不想要非阻塞多线程,您可以轻松地将此函数包装到另一个函数中,并对由此函数 return 编辑的任务执行阻塞 .Wait
。
此实现的关键是 Task.WhenAny
函数。此函数将 return 已完成的任务应用到任务列表中(由 await
.