Parallel.ForEach 未按预期在 C# 的 ConcurrentBag 中添加项目
Parallel.ForEach not adding items as expected in ConcurrentBag in C#
在我的 Asp.Net Core WebApi Controller
中,我收到了 IFormFile[] files
。我需要将其转换为 List<DocumentData>
。我先用了foreach
。它工作正常。但后来决定更改为 Parallel.ForEach
,因为我收到了很多(>5)个文件。
这是我的 DocumentData
Class:
public class DocumentData
{
public byte[] BinaryData { get; set; }
public string FileName { get; set; }
}
这是我的Parallel.ForEach
逻辑:
var documents = new ConcurrentBag<DocumentData>();
Parallel.ForEach(files, async (currentFile) =>
{
if (currentFile.Length > 0)
{
using (var ms = new MemoryStream())
{
await currentFile.CopyToAsync(ms);
documents.Add(new DocumentData
{
BinaryData = ms.ToArray(),
FileName = currentFile.FileName
});
}
}
});
例如,即使输入两个文件,documents
也总是给出一个文件作为输出。我错过了什么吗?
我最初有 List<DocumentData>
。发现不是线程安全的,改成了ConcurrentBag<DocumentData>
。但我仍然得到意想不到的结果。请帮助我哪里错了?
我猜是因为Parallel.Foreach
不支持async/await
。它只需要 Action
作为输入并为每个项目执行它。在异步委托的情况下,它将以即发即弃的方式执行它们。
在这种情况下,传递的 lambda 将被视为 async void
函数并且无法等待 async void
。
如果有需要 Func<Task>
的过载,那么它将起作用。
我建议您在 Select
的帮助下创建 Task
s 并同时使用 Task.WhenAll
来执行它们。
例如:
var tasks = files.Select(async currentFile =>
{
if (currentFile.Length > 0)
{
using (var ms = new MemoryStream())
{
await currentFile.CopyToAsync(ms);
documents.Add(new DocumentData
{
BinaryData = ms.ToArray(),
FileName = currentFile.FileName
});
}
}
});
await Task.WhenAll(tasks);
此外,您可以通过从该方法返回 DocumentData
实例来改进该代码,在这种情况下,无需修改 documents
集合。 Task.WhenAll
具有以 IEnumerable<Task<TResult>
作为输入并生成 TResult
数组的 Task
的重载。所以,结果会是这样:
var tasks = files.Select(async currentFile =>
{
if (currentFile.Length > 0)
{
using (var ms = new MemoryStream())
{
await currentFile.CopyToAsync(ms);
return new DocumentData
{
BinaryData = ms.ToArray(),
FileName = currentFile.FileName
};
}
}
return null;
});
var documents = (await Task.WhenAll(tasks)).Where(d => d != null).ToArray();
您对 并发集合 的想法是正确的,但误用了 TPL 方法 。
简而言之,您需要非常小心 async lambdas,如果您将它们传递给 Action
或 Func<Task>
你的问题是因为 Parallel.For / ForEach
不适合 异步和等待模式 或 IO 绑定任务 。它们适用于 cpu 绑定工作负载。这意味着它们基本上有 Action
参数,让 任务调度程序 为您创建 任务
如果你想 运行 同时使用多个任务,使用 Task.WhenAll
,或者 TPL Dataflow ActionBlock
可以有效处理CPU bound 和 IO bound 工作负载,或者更直接地说,它们可以处理 tasks 这就是异步方法。
根本问题是,当您在 Action
上调用 async lambda 时,您实际上是在创建一个 async void
方法,它将 运行 作为 任务 未被观察到。也就是说,您的 TPL 方法 只是创建了一堆 任务 与 运行 一堆 并行未观察到的任务 而不是等待它们。
这样想,你叫一群朋友去给你买些杂货,他们又告诉其他人去拿你的杂货,但你的朋友向你报告说他们的工作完成了。显然不是,你没有杂货。
在我的 Asp.Net Core WebApi Controller
中,我收到了 IFormFile[] files
。我需要将其转换为 List<DocumentData>
。我先用了foreach
。它工作正常。但后来决定更改为 Parallel.ForEach
,因为我收到了很多(>5)个文件。
这是我的 DocumentData
Class:
public class DocumentData
{
public byte[] BinaryData { get; set; }
public string FileName { get; set; }
}
这是我的Parallel.ForEach
逻辑:
var documents = new ConcurrentBag<DocumentData>();
Parallel.ForEach(files, async (currentFile) =>
{
if (currentFile.Length > 0)
{
using (var ms = new MemoryStream())
{
await currentFile.CopyToAsync(ms);
documents.Add(new DocumentData
{
BinaryData = ms.ToArray(),
FileName = currentFile.FileName
});
}
}
});
例如,即使输入两个文件,documents
也总是给出一个文件作为输出。我错过了什么吗?
我最初有 List<DocumentData>
。发现不是线程安全的,改成了ConcurrentBag<DocumentData>
。但我仍然得到意想不到的结果。请帮助我哪里错了?
我猜是因为Parallel.Foreach
不支持async/await
。它只需要 Action
作为输入并为每个项目执行它。在异步委托的情况下,它将以即发即弃的方式执行它们。
在这种情况下,传递的 lambda 将被视为 async void
函数并且无法等待 async void
。
如果有需要 Func<Task>
的过载,那么它将起作用。
我建议您在 Select
的帮助下创建 Task
s 并同时使用 Task.WhenAll
来执行它们。
例如:
var tasks = files.Select(async currentFile =>
{
if (currentFile.Length > 0)
{
using (var ms = new MemoryStream())
{
await currentFile.CopyToAsync(ms);
documents.Add(new DocumentData
{
BinaryData = ms.ToArray(),
FileName = currentFile.FileName
});
}
}
});
await Task.WhenAll(tasks);
此外,您可以通过从该方法返回 DocumentData
实例来改进该代码,在这种情况下,无需修改 documents
集合。 Task.WhenAll
具有以 IEnumerable<Task<TResult>
作为输入并生成 TResult
数组的 Task
的重载。所以,结果会是这样:
var tasks = files.Select(async currentFile =>
{
if (currentFile.Length > 0)
{
using (var ms = new MemoryStream())
{
await currentFile.CopyToAsync(ms);
return new DocumentData
{
BinaryData = ms.ToArray(),
FileName = currentFile.FileName
};
}
}
return null;
});
var documents = (await Task.WhenAll(tasks)).Where(d => d != null).ToArray();
您对 并发集合 的想法是正确的,但误用了 TPL 方法 。
简而言之,您需要非常小心 async lambdas,如果您将它们传递给 Action
或 Func<Task>
你的问题是因为 Parallel.For / ForEach
不适合 异步和等待模式 或 IO 绑定任务 。它们适用于 cpu 绑定工作负载。这意味着它们基本上有 Action
参数,让 任务调度程序 为您创建 任务
如果你想 运行 同时使用多个任务,使用 Task.WhenAll
,或者 TPL Dataflow ActionBlock
可以有效处理CPU bound 和 IO bound 工作负载,或者更直接地说,它们可以处理 tasks 这就是异步方法。
根本问题是,当您在 Action
上调用 async lambda 时,您实际上是在创建一个 async void
方法,它将 运行 作为 任务 未被观察到。也就是说,您的 TPL 方法 只是创建了一堆 任务 与 运行 一堆 并行未观察到的任务 而不是等待它们。
这样想,你叫一群朋友去给你买些杂货,他们又告诉其他人去拿你的杂货,但你的朋友向你报告说他们的工作完成了。显然不是,你没有杂货。