C#:使用 Parallel.ForEach 和异步操作限制并发操作的最大值
C#: limit maximum of concurrent operation with Parallel.ForEach and async Action
我正在尝试使用 asp.net 核心 2.1 实现自托管 Web 服务,但遇到了实现后台长时间执行任务的问题。
由于每个 ProcessSingle
方法(在下面的代码片段中)的高 CPU 负载和时间消耗,我想限制执行并发任务的数量。但是我可以看到 Parallel.ForEach
中的所有任务几乎立即开始,尽管我设置了 MaxDegreeOfParallelism = 3
我的代码是(简化版):
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Factory.StartNew(async () => {
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
async x => await ProcessSingle(x));
});
// return created id immediately
return id;
}
public static async Task ProcessSingle(MyInputData inputData)
{
var dbData = await GetDataFromDb(); // get data from DB async using Dapper
// some lasting processing (sync)
await SaveDataToDb(); // async save processed data to DB using Dapper
}
如果我没理解错的话,问题出在Parallel.ForEach里面的async x => await ProcessSingle(x)
,对吧?
有人可以描述一下,应该如何以正确的方式实施吗?
更新
由于我的问题有些含糊不清,有必要关注主要方面:
ProcessSingle
方法分为三部分:
从数据库异步获取数据
做长期高CPU-加载数学计算
将结果异步保存到数据库
问题由两个独立的部分组成:
如何减少 CPU 的使用(例如 运行 不超过三个数学同时计算)?
如何保持 ProcessSingle
方法的结构 - 由于异步数据库调用而使它们保持异步。
希望现在更清楚了。
P.S。已经给出了合适的答案,它有效(特别感谢@MatrixTai)。编写此更新是为了进行一般性说明。
更新
我刚刚注意到你在评论中提到,问题是由数学计算引起的。
把计算和更新DB的部分分开会更好
对于计算部分,使用Parallel.ForEach()
来优化你的工作,你可以控制线程数。
并且只有在所有这些任务完成之后。使用 async-await
将您的数据更新到数据库,而无需我提到的 SemaphoreSlim
。
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
//Calculation Part
ConcurrentBag<int> data = new ConcurrentBag<int>();
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
x => {ConcurrentBag.Add(calculationPart(x))});
//Update DB part
int[] data_arr = data.ToArray();
List<Task> worker = new List<Task>();
foreach (var i in data_arr)
{
worker.Add(DBPart(x));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
当您在 Parallel.forEach
.
中使用 async-await
时,肯定它们都是一起开始的
首先,阅读第一个和第二个答案的 question。将这两者结合起来是没有意义的。
实际上async-await
会最大化可用线程的使用,所以简单地使用它。
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
foreach (var i in listOfData)
{
worker.Add(ProcessSingle(x));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
但这是另一个问题,在这种情况下,这些任务仍然一起开始,占用了您的 CPU-usage。
所以要避免这种情况,请使用 SemaphoreSlim
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
//To limit the number of Task started.
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var i in listOfData)
{
await throttler.WaitAsync();
worker.Add(Task.Run(async () =>
{
await ProcessSingle(x);
throttler.Release();
}
));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
阅读更多How to limit the amount of concurrent async I/O operations?。
此外,当简单的 Task.Run()
足以完成您想要的工作时,请不要使用 Task.Factory.StartNew()
,请阅读 Stephen Cleary 撰写的这篇出色的 article。
如果您更熟悉 "traditional" 并行处理概念,请像这样重写您的 ProcessSingle() 方法:
public static void ProcessSingle(MyInputData inputData)
{
var dbData = GetDataFromDb(); // get data from DB async using Dapper
// some lasting processing (sync)
SaveDataToDb(); // async save processed data to DB using Dapper
}
当然,您最好也以类似的方式更改 Work() 方法。
我正在尝试使用 asp.net 核心 2.1 实现自托管 Web 服务,但遇到了实现后台长时间执行任务的问题。
由于每个 ProcessSingle
方法(在下面的代码片段中)的高 CPU 负载和时间消耗,我想限制执行并发任务的数量。但是我可以看到 Parallel.ForEach
中的所有任务几乎立即开始,尽管我设置了 MaxDegreeOfParallelism = 3
我的代码是(简化版):
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Factory.StartNew(async () => {
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
async x => await ProcessSingle(x));
});
// return created id immediately
return id;
}
public static async Task ProcessSingle(MyInputData inputData)
{
var dbData = await GetDataFromDb(); // get data from DB async using Dapper
// some lasting processing (sync)
await SaveDataToDb(); // async save processed data to DB using Dapper
}
如果我没理解错的话,问题出在Parallel.ForEach里面的async x => await ProcessSingle(x)
,对吧?
有人可以描述一下,应该如何以正确的方式实施吗?
更新
由于我的问题有些含糊不清,有必要关注主要方面:
ProcessSingle
方法分为三部分:从数据库异步获取数据
做长期高CPU-加载数学计算
将结果异步保存到数据库
问题由两个独立的部分组成:
如何减少 CPU 的使用(例如 运行 不超过三个数学同时计算)?
如何保持
ProcessSingle
方法的结构 - 由于异步数据库调用而使它们保持异步。
希望现在更清楚了。
P.S。已经给出了合适的答案,它有效(特别感谢@MatrixTai)。编写此更新是为了进行一般性说明。
更新
我刚刚注意到你在评论中提到,问题是由数学计算引起的。
把计算和更新DB的部分分开会更好
对于计算部分,使用Parallel.ForEach()
来优化你的工作,你可以控制线程数。
并且只有在所有这些任务完成之后。使用 async-await
将您的数据更新到数据库,而无需我提到的 SemaphoreSlim
。
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
//Calculation Part
ConcurrentBag<int> data = new ConcurrentBag<int>();
Parallel.ForEach(
listOfData,
new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },
x => {ConcurrentBag.Add(calculationPart(x))});
//Update DB part
int[] data_arr = data.ToArray();
List<Task> worker = new List<Task>();
foreach (var i in data_arr)
{
worker.Add(DBPart(x));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
当您在 Parallel.forEach
.
async-await
时,肯定它们都是一起开始的
首先,阅读第一个和第二个答案的 question。将这两者结合起来是没有意义的。
实际上async-await
会最大化可用线程的使用,所以简单地使用它。
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
foreach (var i in listOfData)
{
worker.Add(ProcessSingle(x));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
但这是另一个问题,在这种情况下,这些任务仍然一起开始,占用了您的 CPU-usage。
所以要避免这种情况,请使用 SemaphoreSlim
public static async Task<int> Work()
{
var id = await CreateIdInDB() // async create record in DB
// run background task, don't wait when it finishes
Task.Run(async () => {
List<Task> worker = new List<Task>();
//To limit the number of Task started.
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var i in listOfData)
{
await throttler.WaitAsync();
worker.Add(Task.Run(async () =>
{
await ProcessSingle(x);
throttler.Release();
}
));
}
await Task.WhenAll(worker);
});
// return created id immediately
return id;
}
阅读更多How to limit the amount of concurrent async I/O operations?。
此外,当简单的 Task.Run()
足以完成您想要的工作时,请不要使用 Task.Factory.StartNew()
,请阅读 Stephen Cleary 撰写的这篇出色的 article。
如果您更熟悉 "traditional" 并行处理概念,请像这样重写您的 ProcessSingle() 方法:
public static void ProcessSingle(MyInputData inputData)
{
var dbData = GetDataFromDb(); // get data from DB async using Dapper
// some lasting processing (sync)
SaveDataToDb(); // async save processed data to DB using Dapper
}
当然,您最好也以类似的方式更改 Work() 方法。