如何在任务异常处理数组中包含任务参数

How to include task parameter In array of tasks exception handling

线程 Nesting await in Parallel.ForEach 有一个答案建议使用 Task.WhenAll 到 运行 并行的多个 (MaxDegreeOfParallelism) 异步任务,而不是等到上一个任务完成。

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });
})); 
}

并称它为

ids.ForEachAsync(10,  async id =>
{
    ICustomerRepo repo = new CustomerRepo();  
    var cust = await repo.GetCustomer(id);  
    customers.Add(cust);  
});

如果body有参数,我想在处理exceptions.e.g时知道参数值。如果 id 的任务主体失败,我需要记录异常,指定它发生在特定的 id 上。

我看过 Accessing values in Task.ContinueWith,但在 t.IsFaulted 时无法访问参数。

最后,我在 lambda 主体中添加了 try/catch,它似乎起作用了

ids.ForEachAsync(10,  async id =>
{
    try
    {
        ICustomerRepo repo = new CustomerRepo();
        var cust = await repo.GetCustomer(id);
        customers.Add(cust);
    }
    catch(Exception e)
    {
        _logger.LogError(e,” id=“+ id);
    }
});

但是我不确定它是否正常工作(即异步,无阻塞)。

后来原始答案的作者 suggested 在 await body 之前使用 var current = partition.Current ,然后在延续中使用 current (ContinueWith(t => { ... })。 –

谁能证实,哪种方法更好?每种方法都有什么缺点?

try/catch 中包装 await 没问题,请参阅:Catch an exception thrown by an async method。与我的建议(捕获 partition.Current 并注入 ContinueWith 延续)没有太大区别,除了它可能更有效一些,因为不涉及捕获。我认为它也更具可读性和优雅性,ContinueWith 是一种 "old" 做事方式(pre async/await)。

请注意,您的示例将异常处理的负担传递给调用者(在本例中调用者调用 _logger.LogError)。您需要确保这就是您想要的,而不是嵌入在 ForEachAsync 代码本身中的包罗万象来处理调用者确实让异常溜走的情况。类似于:

while (partition.MoveNext()) 
{
    try
    {
        await body(partition.Current)
    }
    catch (Exception e)
    {
        // of course here you don't know the type of T (partition.Current)
        // or anything else about the operation for that matter
        LogError("error processing: " + partition.Current + ": " + e); 
    }
}