Linq 调用 task.Run 访问错误成员,延迟执行问题?
Linq calling task.Run accessing wrong member, deferred execution issue?
我在使用错误值的 linq 查询中遇到了一个奇怪的问题。我的代码看起来像这样
await Task.WhenAll((from item in itemsToProcess
let taskCount = count++
select Task.Run(() => { process(item).Result; }))
.AsParallel().ToArray());
基本上我有一个包含 50k 个项目的列表,这些项目被调用到一个进行网络调用的方法中。它们完全不相关,可以按任何顺序 运行,并且不访问任何共享的内容。但是,偶尔,非常随机地,它似乎将错误的项目传递给 process 方法,就像你在 foreach 循环中遇到的那样,如果你没有将它复制到局部变量。
如果我将我的代码更改为这个
await Task.WhenAll((from item in itemsToProcess
let taskCount = count++
let itemCopy = item
select Task.Run(() => { process(itemCopy).Result; }))
.AsParallel().ToArray());
那我好像没有这个问题。所以我的问题是,我是否遗漏了什么,或者这是预期的行为?我认为 linq 的 from 子句应该复制到本地副本,但事实并非如此?我很难找到任何直接解决这个问题的方法。但是我看到很多在 linq 表达式中调用异步方法而不做额外 let 的例子。
我也试过使 lambda 异步并等待该方法,但后来我 运行 进入了无线程的情况。也许有更好的方法来做到这一点?我很高兴知道这件事。简而言之,我所做的就是迭代一个列表并并行调用一个方法,因为它是 I/O 绑定而不是 cpu 绑定。另一种可能性是已经有大量关于此的帖子,而我只是在搜索错误的术语。如果是这样,我也很高兴知道。
并行和异步代码很少应该一起使用。 Parallel 理想情况下仅适用于 CPU 绑定代码。
为什么你不能这样做:
await Task.WhenAll(itemsToProcess.Select(item => process(item)));
根据评论编辑:
使用 SemaphoreSlim
:
可以(稍微)轻松地完成异步节流
static SemaphoreSlim throttle = new SemaphoreSlim(50);
static async Task ProcessAsync(Item item)
{
await throttle.WaitAsync();
try
{
... // Original process(item) code
}
finally
{
throttle.Release();
}
}
这会将项目处理限制为 50。这只是我从空中拉出的一个数字;您应该稍微试验一下以找到合适的值。
请注意,一旦工作变为异步,并行处理限制就会停止工作。异步工作没有 "take up" 线程,因此它不计入并行处理限制(或线程池注入率限制)。
我在使用错误值的 linq 查询中遇到了一个奇怪的问题。我的代码看起来像这样
await Task.WhenAll((from item in itemsToProcess
let taskCount = count++
select Task.Run(() => { process(item).Result; }))
.AsParallel().ToArray());
基本上我有一个包含 50k 个项目的列表,这些项目被调用到一个进行网络调用的方法中。它们完全不相关,可以按任何顺序 运行,并且不访问任何共享的内容。但是,偶尔,非常随机地,它似乎将错误的项目传递给 process 方法,就像你在 foreach 循环中遇到的那样,如果你没有将它复制到局部变量。
如果我将我的代码更改为这个
await Task.WhenAll((from item in itemsToProcess
let taskCount = count++
let itemCopy = item
select Task.Run(() => { process(itemCopy).Result; }))
.AsParallel().ToArray());
那我好像没有这个问题。所以我的问题是,我是否遗漏了什么,或者这是预期的行为?我认为 linq 的 from 子句应该复制到本地副本,但事实并非如此?我很难找到任何直接解决这个问题的方法。但是我看到很多在 linq 表达式中调用异步方法而不做额外 let 的例子。
我也试过使 lambda 异步并等待该方法,但后来我 运行 进入了无线程的情况。也许有更好的方法来做到这一点?我很高兴知道这件事。简而言之,我所做的就是迭代一个列表并并行调用一个方法,因为它是 I/O 绑定而不是 cpu 绑定。另一种可能性是已经有大量关于此的帖子,而我只是在搜索错误的术语。如果是这样,我也很高兴知道。
并行和异步代码很少应该一起使用。 Parallel 理想情况下仅适用于 CPU 绑定代码。
为什么你不能这样做:
await Task.WhenAll(itemsToProcess.Select(item => process(item)));
根据评论编辑:
使用 SemaphoreSlim
:
static SemaphoreSlim throttle = new SemaphoreSlim(50);
static async Task ProcessAsync(Item item)
{
await throttle.WaitAsync();
try
{
... // Original process(item) code
}
finally
{
throttle.Release();
}
}
这会将项目处理限制为 50。这只是我从空中拉出的一个数字;您应该稍微试验一下以找到合适的值。
请注意,一旦工作变为异步,并行处理限制就会停止工作。异步工作没有 "take up" 线程,因此它不计入并行处理限制(或线程池注入率限制)。