在 API 请求过程中使用 Parallel foreach?
Using Parallel foreach in an API request process?
在处理请求时使用 Parallel.ForEach
是否正确?我问这个是因为 async task
旨在尽可能多地满足更多请求,而不是尽可能快,而这正是 Parallel.ForEach
会做的。
简单示例:
public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
Parallel.ForEach(ids, async (id) =>
{
await this.doStuff(id);
await this.doAnotherStuff(id);
});
return OperationResult.Success();
}
假设我可以收到 1 个 ID 或 100 万个 ID,并且我希望尽可能多地满足与会者的请求。由于我的线程将忙于处理 100 万个 ID,因此与会者的新请求会很困难,对吗?
谢谢!
您的担心是对的,Parallel.ForEach
默认情况下会尽可能多地使用线程池中的线程,线程池会逐渐扩展到它需要的最大线程数。 Task.Run
通常对于 Web 服务器来说不是一个好主意,Parallel.ForEach
通常更糟很多倍。
特别是考虑到 ids
是无限的,您可能很快就会遇到这样一种情况,即您的请求将排队,因为所有线程都忙于只满足少数请求。
所以你的担心是对的,这种代码正在针对非常低的规模优化单个请求的延迟,但在规模上会牺牲一个公平且性能良好的网络服务器,最终消除延迟的初始延迟胜利,并给您带来更广泛的服务问题。
Update - 正如 Panagiotis Kanavos 在评论中指出的那样,Parallel.ForEach
没有 Task
重载,因此只有 运行委托的初始同步部分,让大部分异步工作排队,你的 API 刚刚火了,可能在不知不觉中忘记了。
对于使用 ChannelReader
和 ChannelWriter
的完全异步生产者消费者模式的替代版本,以及一些新的 C# 8.0 语法,您可以试试这个:
public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100) {SingleWriter = true});
foreach (var id in ids)
{
await channel.Writer.WriteAsync(id); // If the back pressure exceeds 100 ids, we asynchronously wait here
}
channel.Writer.Complete();
for (var i = 0; i < 8; i++) // 8 concurrent readers
{
_ = Task.Run(async () =>
{
await foreach (var id in channel.Reader.ReadAllAsync())
{
await this.doStuff(id);
await this.doAnotherStuff(id);
}
});
}
return OperationResult.Success();
}
不,这不正确。 Parallel.ForEach
用于数据并行。它将创建与机器上的核心一样多的工作任务,对输入数据进行分区,并为每个分区使用一个工作人员。它对 async
操作一无所知,这意味着您的代码本质上是 :
Parallel.ForEach(ids, async void (int id) =>
{
await this.doStuff(id);
await this.doAnotherStuff(id);
});
在 quad 机器上,这将触发 1M 请求,一次 4 个,而不等待任何请求。它可以很容易地 return 在任何请求有机会完成之前。
如果您想以受控方式执行多个请求,您可以使用例如具有特定并行度的 ActionBlock,例如:
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10,
BoundedCapacity=100
}
var block=new ActionBlock<string>(async id=>{....},options);
foreach(var id in ids)
{
await block.SendAsync(id);
}
block.Complete();
await block.Completion;
该块将处理最多 10 个并发请求。如果操作真的是异步的,或者异步等待时间很长,我们可以很容易地使用比可用内核数更高的 DOP。
输入消息被缓冲,这意味着我们最终可能会在慢速块的输入缓冲区中等待 1M 请求。为避免这种情况,如果块不能接受更多输入,BoundedCapacity
设置将块 SendAsync
。
最后,对 Complete()
的调用告诉块我们已经完成,它应该处理其输入缓冲区中的所有剩余消息。我们等待他们以 await block.Completion
结束
在处理请求时使用 Parallel.ForEach
是否正确?我问这个是因为 async task
旨在尽可能多地满足更多请求,而不是尽可能快,而这正是 Parallel.ForEach
会做的。
简单示例:
public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
Parallel.ForEach(ids, async (id) =>
{
await this.doStuff(id);
await this.doAnotherStuff(id);
});
return OperationResult.Success();
}
假设我可以收到 1 个 ID 或 100 万个 ID,并且我希望尽可能多地满足与会者的请求。由于我的线程将忙于处理 100 万个 ID,因此与会者的新请求会很困难,对吗?
谢谢!
您的担心是对的,Parallel.ForEach
默认情况下会尽可能多地使用线程池中的线程,线程池会逐渐扩展到它需要的最大线程数。 Task.Run
通常对于 Web 服务器来说不是一个好主意,Parallel.ForEach
通常更糟很多倍。
特别是考虑到 ids
是无限的,您可能很快就会遇到这样一种情况,即您的请求将排队,因为所有线程都忙于只满足少数请求。
所以你的担心是对的,这种代码正在针对非常低的规模优化单个请求的延迟,但在规模上会牺牲一个公平且性能良好的网络服务器,最终消除延迟的初始延迟胜利,并给您带来更广泛的服务问题。
Update - 正如 Panagiotis Kanavos 在评论中指出的那样,Parallel.ForEach
没有 Task
重载,因此只有 运行委托的初始同步部分,让大部分异步工作排队,你的 API 刚刚火了,可能在不知不觉中忘记了。
对于使用 ChannelReader
和 ChannelWriter
的完全异步生产者消费者模式的替代版本,以及一些新的 C# 8.0 语法,您可以试试这个:
public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100) {SingleWriter = true});
foreach (var id in ids)
{
await channel.Writer.WriteAsync(id); // If the back pressure exceeds 100 ids, we asynchronously wait here
}
channel.Writer.Complete();
for (var i = 0; i < 8; i++) // 8 concurrent readers
{
_ = Task.Run(async () =>
{
await foreach (var id in channel.Reader.ReadAllAsync())
{
await this.doStuff(id);
await this.doAnotherStuff(id);
}
});
}
return OperationResult.Success();
}
不,这不正确。 Parallel.ForEach
用于数据并行。它将创建与机器上的核心一样多的工作任务,对输入数据进行分区,并为每个分区使用一个工作人员。它对 async
操作一无所知,这意味着您的代码本质上是 :
Parallel.ForEach(ids, async void (int id) =>
{
await this.doStuff(id);
await this.doAnotherStuff(id);
});
在 quad 机器上,这将触发 1M 请求,一次 4 个,而不等待任何请求。它可以很容易地 return 在任何请求有机会完成之前。
如果您想以受控方式执行多个请求,您可以使用例如具有特定并行度的 ActionBlock,例如:
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10,
BoundedCapacity=100
}
var block=new ActionBlock<string>(async id=>{....},options);
foreach(var id in ids)
{
await block.SendAsync(id);
}
block.Complete();
await block.Completion;
该块将处理最多 10 个并发请求。如果操作真的是异步的,或者异步等待时间很长,我们可以很容易地使用比可用内核数更高的 DOP。
输入消息被缓冲,这意味着我们最终可能会在慢速块的输入缓冲区中等待 1M 请求。为避免这种情况,如果块不能接受更多输入,BoundedCapacity
设置将块 SendAsync
。
最后,对 Complete()
的调用告诉块我们已经完成,它应该处理其输入缓冲区中的所有剩余消息。我们等待他们以 await block.Completion