异步并行任务的正确方法

Correct way to a-synchronize parallel tasks

目前我们有这段代码可以正常工作:

Result result1 = null;
Result result2 = null;

var task1 = Task.Factory.StartNew(()=>
{
   var records = DB.Read("..");
   //Do A lot
   result1 = Process(records);  
}); 

var task2 = Task.Factory.StartNew(()=>
{
   var records = DB.Read(".....");
   //Do A lot
   result2 = Process(records);  
});

Task.WaitAll(task1, task2);

var result = Combine(result1, result2);

现在我们想使用数据库函数的异步副本,我们正在使用这个新模式:

Result result1 = null;
Result result2 = null;

var task1 = await Task.Factory.StartNew( async ()=>
{
   var records = await DB.ReadAsync("..");
   //Do A lot
   result1 = Process(records);  
}); 

var task2 = await Task.Factory.StartNew(async ()=>
{
   var records = await DB.ReadAsync(".....");
   //Do A lot
   result2 = Process(records);  
});

Task.WaitAll(task1, task2);

var result = Combine(result1, result2);

切换到异步后,我们开始观察异常行为。所以我想知道这是否是并行化异步调用的正确模式?

Task.Factory.StartNew 是预异步 API。您应该使用 Task.Run,它在设计时考虑了异步等待:

var task1 = await Task.Run( async ()=>
{
   var records = await DB.ReadAsync("..");
   //Do A lot
   result1 = Process(records);  
});

问题是异步 lambda returns 一个 Task 所以 Task.Factory.StartNew returns 一个 Task<Task> (外层的因为 Task.Factory.StartNew returns 一个 Task 和内部一个是异步 lambda 的结果)。

这意味着当您等待 task1task2 时,您实际上并不是在等待整个操作,只是等待它的同步部分。

您可以通过在返回的 Task<Task>:

上使用 Task.Unwrap 来解决这个问题
Task<Task> task1 = await Task.Factory.StartNew(async ()=>
{
   var records = await DB.ReadAsync("..");
   //Do A lot
   result1 = Process(records);  
});

Task actualTask1 = task1.Unwrap();
await actualTask1;

但是 Task.Run 会暗中为您做到这一点。


作为旁注,您应该意识到您不需要 Task.Run 来同时执行这些操作。您可以通过调用这些方法并与 Task.When:

一起等待结果来做到这一点
async Task MainAsync()
{
    var task1 = FooAsync();
    var task2 = BarAsync();
    await Task.WhenAll(task1, task2);

    var result = Combine(task1.Result, task2.Result);
}

async Task<Result> FooAsync()
{
    var records = await DB.ReadAsync("..");
    //Do A lot
    return Process(records);  
}

async Task<Result> BarAsync()
{
    var records = await DB.ReadAsync(".....");
    //Do A lot
    return Process(records);
}

如果您甚至需要将这些方法的同步部分(第一个 await 之前的部分)卸载到 ThreadPool

,则只需要 Task.Run

Task.Factory.StartNew开始一个新的Task执行另一个独立的执行单元。因此,最简单的处理方法可能如下所示:

var task1 = Task.Factory.StartNew(()=> //NO AWAIT
{
   var records = DB.Read("....."); //NO ASYNC
   //Do A lot
   result1 = Process(records);  
});

... another task definition

Task.WaitAll(task1, task2);

在一项任务中顺序读取和处理,因为您有数据依赖性。

好吧,使用 .WaitAll 不是异步编程,因为您实际上是在等待当前线程。此外,您不调用 .Unwrap,这就是为什么您只等待异步 lambda 的创建,而不是异步 lambda 本身。

Task.Run 可以为您解包异步 lambda。但还有一种更简单、更干净的方法。

var task1 = DB.ReadAsync("..").ContinueWith(task => {
   //Do A lot
   return Process(task.Result);  
}, TaskScheduler.Default);

var task2 = DB.ReadAsync("..").ContinueWith(task => {
   //Do A lot
   return Process(task.Result);  
}, TaskScheduler.Default);

var result = Combine(await task1, await task2);

通过这种方式,您将在准备就绪时准确获得结果。所以你根本不需要额外的任务和变量。

请注意,ContinueWith 是一个棘手的函数,如果它不为空,它会在 TaskScheduler.Current 上运行,否则它会在线程池调度程序 TaskScheduler.Default 上运行。所以在调用这个函数时总是显式指定调度程序更安全。

另外,对于 claryfing,我没有包括错误检查,因为实际上 DB.ReadAsync 可以在错误的情况下完成。但这是一件容易的事,你可以自己处理。