在方法返回任务上使用 Parallel.ForEach - 避免 Task.WaitAll

Use Parallel.ForEach on method returning task - avoid Task.WaitAll

我有一个方法需要 IWorkItem,开始处理它和 returns 相关任务。由于使用了外部库,该方法必须如下所示。

public Task WorkOn(IWorkItem workItem)
{
    //...start asynchronous operation, return task
}

我想在多个工作项上完成这项工作。我不知道他们中会有多少人——也许 1 人,也许 10 000 人。 WorkOn 方法有内部池,如果达到太多并行执行,可能需要等待。 (如 SemaphoreSlim.Wait):

public Task WorkOn(IWorkItem workItem)
{
    _semaphoreSlim.Wait();
}

我目前的解决方案是:

public void Do(params IWorkItem[] workItems)
{
    var tasks = new Task[workItems.Length];
    for (var i = 0; i < workItems.Length; i++)
    {
        tasks[i] = WorkOn(workItems[i]);
    }
    Task.WaitAll(tasks);
}

问题:在这种情况下我可以使用某种方式 Parallel.ForEach 吗?为避免创建 10000 个任务后因 WorkOn 的节流而等待?

其实没那么容易。您可以使用 Parallel.ForEach 来限制生成的任务数量。但我不确定 perform/behave 你的情况如何。

作为一般经验法则,我通常尽量避免混合使用 TaskParallel

当然你可以这样做:

public void Do(params IWorkItem[] workItems)
{
    Parallel.ForEach(workItems, (workItem) => WorkOn(workItem).Wait());
}

在 "normal" 条件下,这应该可以很好地限制您的并发性。

您也可以完全 async-await 并使用一些技巧为您的并发添加一些限制。但是在那种情况下你必须自己做并发限制。

const int ConcurrencyLimit = 8;
public async Task Do(params IWorkItem[] workItems)
{
    var cursor = 0;
    var currentlyProcessing = new List<Task>(ConcurrencyLimit);
    while (cursor < workItems.Length)
    {
        while (currentlyProcessing.Count < ConcurrencyLimit && cursor < workItems.Length)
        {
            currentlyProcessing.Add(WorkOn(workItems[cursor]));
            cursor++;
        }

        Task finished = await Task.WhenAny(currentlyProcessing);
        currentlyProcessing.Remove(finished);
    }

    await Task.WhenAll(currentlyProcessing);
}

就像我说的……要复杂得多。但它也会将并发限制为您应用的任何值。此外,它还正确地使用了 async-await 模式。如果您不想要非阻塞多线程,您可以轻松地将此函数包装到另一个函数中,并对由此函数 return 编辑的任务执行阻塞 .Wait

此实现的关键是 Task.WhenAny 函数。此函数将 return 已完成的任务应用到任务列表中(由 await.

的另一任务包装)