如何在任务异常处理数组中包含任务参数
How to include task parameter In array of tasks exception handling
线程 Nesting await in Parallel.ForEach 有一个答案建议使用 Task.WhenAll 到 运行 并行的多个 (MaxDegreeOfParallelism) 异步任务,而不是等到上一个任务完成。
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
并称它为
ids.ForEachAsync(10, async id =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(id);
customers.Add(cust);
});
如果body有参数,我想在处理exceptions.e.g时知道参数值。如果 id 的任务主体失败,我需要记录异常,指定它发生在特定的 id 上。
我看过
Accessing values in Task.ContinueWith,但在 t.IsFaulted 时无法访问参数。
最后,我在 lambda 主体中添加了 try/catch,它似乎起作用了
ids.ForEachAsync(10, async id =>
{
try
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(id);
customers.Add(cust);
}
catch(Exception e)
{
_logger.LogError(e,” id=“+ id);
}
});
但是我不确定它是否正常工作(即异步,无阻塞)。
后来原始答案的作者 suggested 在 await body 之前使用 var current = partition.Current ,然后在延续中使用 current (ContinueWith(t => { ... })。 –
谁能证实,哪种方法更好?每种方法都有什么缺点?
在 try
/catch
中包装 await
没问题,请参阅:Catch an exception thrown by an async method。与我的建议(捕获 partition.Current
并注入 ContinueWith
延续)没有太大区别,除了它可能更有效一些,因为不涉及捕获。我认为它也更具可读性和优雅性,ContinueWith
是一种 "old" 做事方式(pre async
/await
)。
请注意,您的示例将异常处理的负担传递给调用者(在本例中调用者调用 _logger.LogError
)。您需要确保这就是您想要的,而不是嵌入在 ForEachAsync
代码本身中的包罗万象来处理调用者确实让异常溜走的情况。类似于:
while (partition.MoveNext())
{
try
{
await body(partition.Current)
}
catch (Exception e)
{
// of course here you don't know the type of T (partition.Current)
// or anything else about the operation for that matter
LogError("error processing: " + partition.Current + ": " + e);
}
}
线程 Nesting await in Parallel.ForEach 有一个答案建议使用 Task.WhenAll 到 运行 并行的多个 (MaxDegreeOfParallelism) 异步任务,而不是等到上一个任务完成。
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
并称它为
ids.ForEachAsync(10, async id =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(id);
customers.Add(cust);
});
如果body有参数,我想在处理exceptions.e.g时知道参数值。如果 id 的任务主体失败,我需要记录异常,指定它发生在特定的 id 上。
我看过 Accessing values in Task.ContinueWith,但在 t.IsFaulted 时无法访问参数。
最后,我在 lambda 主体中添加了 try/catch,它似乎起作用了
ids.ForEachAsync(10, async id =>
{
try
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(id);
customers.Add(cust);
}
catch(Exception e)
{
_logger.LogError(e,” id=“+ id);
}
});
但是我不确定它是否正常工作(即异步,无阻塞)。
后来原始答案的作者 suggested 在 await body 之前使用 var current = partition.Current ,然后在延续中使用 current (ContinueWith(t => { ... })。 –
谁能证实,哪种方法更好?每种方法都有什么缺点?
在 try
/catch
中包装 await
没问题,请参阅:Catch an exception thrown by an async method。与我的建议(捕获 partition.Current
并注入 ContinueWith
延续)没有太大区别,除了它可能更有效一些,因为不涉及捕获。我认为它也更具可读性和优雅性,ContinueWith
是一种 "old" 做事方式(pre async
/await
)。
请注意,您的示例将异常处理的负担传递给调用者(在本例中调用者调用 _logger.LogError
)。您需要确保这就是您想要的,而不是嵌入在 ForEachAsync
代码本身中的包罗万象来处理调用者确实让异常溜走的情况。类似于:
while (partition.MoveNext())
{
try
{
await body(partition.Current)
}
catch (Exception e)
{
// of course here you don't know the type of T (partition.Current)
// or anything else about the operation for that matter
LogError("error processing: " + partition.Current + ": " + e);
}
}