在 API 请求过程中使用 Parallel foreach?

Using Parallel foreach in an API request process?

在处理请求时使用 Parallel.ForEach 是否正确?我问这个是因为 async task 旨在尽可能多地满足更多请求,而不是尽可能快,而这正是 Parallel.ForEach 会做的。

简单示例:

public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
    Parallel.ForEach(ids, async (id) =>
            {         
                await this.doStuff(id);
                await this.doAnotherStuff(id);
            });

    return OperationResult.Success();
}

假设我可以收到 1 个 ID 或 100 万个 ID,并且我希望尽可能多地满足与会者的请求。由于我的线程将忙于处理 100 万个 ID,因此与会者的新请求会很困难,对吗?

谢谢!

您的担心是对的,Parallel.ForEach 默认情况下会尽可能多地使用线程池中的线程,线程池会逐渐扩展到它需要的最大线程数。 Task.Run 通常对于 Web 服务器来说不是一个好主意,Parallel.ForEach 通常更糟很多倍。

特别是考虑到 ids 是无限的,您可能很快就会遇到这样一种情况,即您的请求将排队,因为所有线程都忙于只满足少数请求。

所以你的担心是对的,这种代码正在针对非常低的规模优化单个请求的延迟,但在规模上会牺牲一个公平且性能良好的网络服务器,最终消除延迟的初始延迟胜利,并给您带来更广泛的服务问题。

Update - 正如 Panagiotis Kanavos 在评论中指出的那样,Parallel.ForEach 没有 Task 重载,因此只有 运行委托的初始同步部分,让大部分异步工作排队,你的 API 刚刚火了,可能在不知不觉中忘记了。

对于使用 ChannelReaderChannelWriter 的完全异步生产者消费者模式的替代版本,以及一些新的 C# 8.0 语法,您可以试试这个:

public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
    var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100) {SingleWriter = true});

    foreach (var id in ids)
    {
        await channel.Writer.WriteAsync(id); // If the back pressure exceeds 100 ids, we asynchronously wait here
    }
    channel.Writer.Complete();

    for (var i = 0; i < 8; i++) // 8 concurrent readers
    {
        _ = Task.Run(async () =>
        {
            await foreach (var id in channel.Reader.ReadAllAsync())
            {
                await this.doStuff(id);
                await this.doAnotherStuff(id);
            }
        });
    }

    return OperationResult.Success();
}

不,这不正确。 Parallel.ForEach 用于数据并行。它将创建与机器上的核心一样多的工作任务,对输入数据进行分区,并为每个分区使用一个工作人员。它对 async 操作一无所知,这意味着您的代码本质上是 :

Parallel.ForEach(ids, async void (int id) =>
        {         
            await this.doStuff(id);
            await this.doAnotherStuff(id);
        });

在 quad 机器上,这将触发 1M 请求,一次 4 个,而不等待任何请求。它可以很容易地 return 在任何请求有机会完成之前。

如果您想以受控方式执行多个请求,您可以使用例如具有特定并行度的 ActionBlock,例如:

var options=new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 10,
    BoundedCapacity=100
}
var block=new ActionBlock<string>(async id=>{....},options);


foreach(var id in ids)
{
    await block.SendAsync(id);
}
block.Complete();
await block.Completion;

该块将处理最多 10 个并发请求。如果操作真的是异步的,或者异步等待时间很长,我们可以很容易地使用比可用内核数更高的 DOP。

输入消息被缓冲,这意味着我们最终可能会在慢速块的输入缓冲区中等待 1M 请求。为避免这种情况,如果块不能接受更多输入,BoundedCapacity 设置将块 SendAsync

最后,对 Complete() 的调用告诉块我们已经完成,它应该处理其输入缓冲区中的所有剩余消息。我们等待他们以 await block.Completion

结束