Linq 调用 task.Run 访问错误成员,延迟执行问题?

Linq calling task.Run accessing wrong member, deferred execution issue?

我在使用错误值的 linq 查询中遇到了一个奇怪的问题。我的代码看起来像这样

await Task.WhenAll((from item in itemsToProcess
    let taskCount = count++
    select Task.Run(() => { process(item).Result; }))
    .AsParallel().ToArray());

基本上我有一个包含 50k 个项目的列表,这些项目被调用到一个进行网络调用的方法中。它们完全不相关,可以按任何顺序 运行,并且不访问任何共享的内容。但是,偶尔,非常随机地,它似乎将错误的项目传递给 process 方法,就像你在 foreach 循环中遇到的那样,如果你没有将它复制到局部变量。

如果我将我的代码更改为这个

await Task.WhenAll((from item in itemsToProcess
    let taskCount = count++
    let itemCopy = item
    select Task.Run(() => { process(itemCopy).Result; }))
    .AsParallel().ToArray());

那我好像没有这个问题。所以我的问题是,我是否遗漏了什么,或者这是预期的行为?我认为 linq 的 from 子句应该复制到本地副本,但事实并非如此?我很难找到任何直接解决这个问题的方法。但是我看到很多在 linq 表达式中调用异步方法而不做额外 let 的例子。

我也试过使 lambda 异步并等待该方法,但后来我 运行 进入了无线程的情况。也许有更好的方法来做到这一点?我很高兴知道这件事。简而言之,我所做的就是迭代一个列表并并行调用一个方法,因为它是 I/O 绑定而不是 cpu 绑定。另一种可能性是已经有大量关于此的帖子,而我只是在搜索错误的术语。如果是这样,我也很高兴知道。

并行和异步代码很少应该一起使用。 Parallel 理想情况下仅适用于 CPU 绑定代码。

为什么你不能这样做:

await Task.WhenAll(itemsToProcess.Select(item => process(item)));

根据评论编辑:

使用 SemaphoreSlim:

可以(稍微)轻松地完成异步节流
static SemaphoreSlim throttle = new SemaphoreSlim(50);
static async Task ProcessAsync(Item item)
{
  await throttle.WaitAsync();
  try
  {
    ... // Original process(item) code
  }
  finally
  {
    throttle.Release();
  }
}

这会将项目处理限制为 50。这只是我从空中拉出的一个数字;您应该稍微试验一下以找到合适的值。

请注意,一旦工作变为异步,并行处理限制就会停止工作。异步工作没有 "take up" 线程,因此它不计入并行处理限制(或线程池注入率限制)。