Parallel.ForEach 未按预期在 C# 的 ConcurrentBag 中添加项目

Parallel.ForEach not adding items as expected in ConcurrentBag in C#

在我的 Asp.Net Core WebApi Controller 中,我收到了 IFormFile[] files。我需要将其转换为 List<DocumentData>。我先用了foreach。它工作正常。但后来决定更改为 Parallel.ForEach,因为我收到了很多(>5)个文件。

这是我的 DocumentData Class:

public class DocumentData
{
    public byte[] BinaryData { get; set; }
    public string FileName { get; set; }
}

这是我的Parallel.ForEach逻辑:

var documents = new ConcurrentBag<DocumentData>();
Parallel.ForEach(files, async (currentFile) =>
{
    if (currentFile.Length > 0)
    {
        using (var ms = new MemoryStream())
        {
            await currentFile.CopyToAsync(ms);
            documents.Add(new DocumentData
            {
                BinaryData = ms.ToArray(),
                FileName = currentFile.FileName
            });
        }
    }
});

例如,即使输入两个文件,documents 也总是给出一个文件作为输出。我错过了什么吗?

我最初有 List<DocumentData>。发现不是线程安全的,改成了ConcurrentBag<DocumentData>。但我仍然得到意想不到的结果。请帮助我哪里错了?

我猜是因为Parallel.Foreach不支持async/await。它只需要 Action 作为输入并为每个项目执行它。在异步委托的情况下,它将以即发即弃的方式执行它们。 在这种情况下,传递的 lambda 将被视为 async void 函数并且无法等待 async void

如果有需要 Func<Task> 的过载,那么它将起作用。

我建议您在 Select 的帮助下创建 Tasks 并同时使用 Task.WhenAll 来执行它们。

例如:

var tasks = files.Select(async currentFile =>
{
    if (currentFile.Length > 0)
    {
        using (var ms = new MemoryStream())
        {
            await currentFile.CopyToAsync(ms);
            documents.Add(new DocumentData
            {
                BinaryData = ms.ToArray(),
                FileName = currentFile.FileName
            });
        }
    }
});

await Task.WhenAll(tasks);

此外,您可以通过从该方法返回 DocumentData 实例来改进该代码,在这种情况下,无需修改 documents 集合。 Task.WhenAll 具有以 IEnumerable<Task<TResult> 作为输入并生成 TResult 数组的 Task 的重载。所以,结果会是这样:

var tasks = files.Select(async currentFile =>
    {
        if (currentFile.Length > 0)
        {
            using (var ms = new MemoryStream())
            {
                await currentFile.CopyToAsync(ms);
                return new DocumentData
                {
                    BinaryData = ms.ToArray(),
                    FileName = currentFile.FileName
                };
            }
        }

        return null;
    });

var documents =  (await Task.WhenAll(tasks)).Where(d => d != null).ToArray();

您对 并发集合 的想法是正确的,但误用了 TPL 方法

简而言之,您需要非常小心 async lambdas,如果您将它们传递给 ActionFunc<Task>

你的问题是因为 Parallel.For / ForEach 不适合 异步和等待模式 IO 绑定任务 。它们适用于 cpu 绑定工作负载。这意味着它们基本上有 Action 参数,让 任务调度程序 为您创建 任务

如果你想 运行 同时使用多个任务,使用 Task.WhenAll ,或者 TPL Dataflow ActionBlock 可以有效处理CPU boundIO bound 工作负载,或者更直接地说,它们可以处理 tasks 这就是异步方法。

根本问题是,当您在 Action 上调用 async lambda 时,您实际上是在创建一个 async void 方法,它将 运行 作为 任务 未被观察到。也就是说,您的 TPL 方法 只是创建了一堆 任务 与 运行 一堆 并行未观察到的任务 而不是等待它们。

这样想,你叫一群朋友去给你买些杂货,他们又告诉其他人去拿你的杂货,但你的朋友向你报告说他们的工作完成了。显然不是,你没有杂货。