异步并行任务的正确方法
Correct way to a-synchronize parallel tasks
目前我们有这段代码可以正常工作:
Result result1 = null;
Result result2 = null;
var task1 = Task.Factory.StartNew(()=>
{
var records = DB.Read("..");
//Do A lot
result1 = Process(records);
});
var task2 = Task.Factory.StartNew(()=>
{
var records = DB.Read(".....");
//Do A lot
result2 = Process(records);
});
Task.WaitAll(task1, task2);
var result = Combine(result1, result2);
现在我们想使用数据库函数的异步副本,我们正在使用这个新模式:
Result result1 = null;
Result result2 = null;
var task1 = await Task.Factory.StartNew( async ()=>
{
var records = await DB.ReadAsync("..");
//Do A lot
result1 = Process(records);
});
var task2 = await Task.Factory.StartNew(async ()=>
{
var records = await DB.ReadAsync(".....");
//Do A lot
result2 = Process(records);
});
Task.WaitAll(task1, task2);
var result = Combine(result1, result2);
切换到异步后,我们开始观察异常行为。所以我想知道这是否是并行化异步调用的正确模式?
Task.Factory.StartNew
是预异步 API。您应该使用 Task.Run
,它在设计时考虑了异步等待:
var task1 = await Task.Run( async ()=>
{
var records = await DB.ReadAsync("..");
//Do A lot
result1 = Process(records);
});
问题是异步 lambda returns 一个 Task
所以 Task.Factory.StartNew
returns 一个 Task<Task>
(外层的因为 Task.Factory.StartNew
returns 一个 Task
和内部一个是异步 lambda 的结果)。
这意味着当您等待 task1
和 task2
时,您实际上并不是在等待整个操作,只是等待它的同步部分。
您可以通过在返回的 Task<Task>
:
上使用 Task.Unwrap
来解决这个问题
Task<Task> task1 = await Task.Factory.StartNew(async ()=>
{
var records = await DB.ReadAsync("..");
//Do A lot
result1 = Process(records);
});
Task actualTask1 = task1.Unwrap();
await actualTask1;
但是 Task.Run
会暗中为您做到这一点。
作为旁注,您应该意识到您不需要 Task.Run
来同时执行这些操作。您可以通过调用这些方法并与 Task.When
:
一起等待结果来做到这一点
async Task MainAsync()
{
var task1 = FooAsync();
var task2 = BarAsync();
await Task.WhenAll(task1, task2);
var result = Combine(task1.Result, task2.Result);
}
async Task<Result> FooAsync()
{
var records = await DB.ReadAsync("..");
//Do A lot
return Process(records);
}
async Task<Result> BarAsync()
{
var records = await DB.ReadAsync(".....");
//Do A lot
return Process(records);
}
如果您甚至需要将这些方法的同步部分(第一个 await
之前的部分)卸载到 ThreadPool
。
,则只需要 Task.Run
Task.Factory.StartNew
开始一个新的Task执行另一个独立的执行单元。因此,最简单的处理方法可能如下所示:
var task1 = Task.Factory.StartNew(()=> //NO AWAIT
{
var records = DB.Read("....."); //NO ASYNC
//Do A lot
result1 = Process(records);
});
... another task definition
Task.WaitAll(task1, task2);
在一项任务中顺序读取和处理,因为您有数据依赖性。
好吧,使用 .WaitAll 不是异步编程,因为您实际上是在等待当前线程。此外,您不调用 .Unwrap,这就是为什么您只等待异步 lambda 的创建,而不是异步 lambda 本身。
Task.Run 可以为您解包异步 lambda。但还有一种更简单、更干净的方法。
var task1 = DB.ReadAsync("..").ContinueWith(task => {
//Do A lot
return Process(task.Result);
}, TaskScheduler.Default);
var task2 = DB.ReadAsync("..").ContinueWith(task => {
//Do A lot
return Process(task.Result);
}, TaskScheduler.Default);
var result = Combine(await task1, await task2);
通过这种方式,您将在准备就绪时准确获得结果。所以你根本不需要额外的任务和变量。
请注意,ContinueWith 是一个棘手的函数,如果它不为空,它会在 TaskScheduler.Current 上运行,否则它会在线程池调度程序 TaskScheduler.Default 上运行。所以在调用这个函数时总是显式指定调度程序更安全。
另外,对于 claryfing,我没有包括错误检查,因为实际上 DB.ReadAsync 可以在错误的情况下完成。但这是一件容易的事,你可以自己处理。
目前我们有这段代码可以正常工作:
Result result1 = null;
Result result2 = null;
var task1 = Task.Factory.StartNew(()=>
{
var records = DB.Read("..");
//Do A lot
result1 = Process(records);
});
var task2 = Task.Factory.StartNew(()=>
{
var records = DB.Read(".....");
//Do A lot
result2 = Process(records);
});
Task.WaitAll(task1, task2);
var result = Combine(result1, result2);
现在我们想使用数据库函数的异步副本,我们正在使用这个新模式:
Result result1 = null;
Result result2 = null;
var task1 = await Task.Factory.StartNew( async ()=>
{
var records = await DB.ReadAsync("..");
//Do A lot
result1 = Process(records);
});
var task2 = await Task.Factory.StartNew(async ()=>
{
var records = await DB.ReadAsync(".....");
//Do A lot
result2 = Process(records);
});
Task.WaitAll(task1, task2);
var result = Combine(result1, result2);
切换到异步后,我们开始观察异常行为。所以我想知道这是否是并行化异步调用的正确模式?
Task.Factory.StartNew
是预异步 API。您应该使用 Task.Run
,它在设计时考虑了异步等待:
var task1 = await Task.Run( async ()=>
{
var records = await DB.ReadAsync("..");
//Do A lot
result1 = Process(records);
});
问题是异步 lambda returns 一个 Task
所以 Task.Factory.StartNew
returns 一个 Task<Task>
(外层的因为 Task.Factory.StartNew
returns 一个 Task
和内部一个是异步 lambda 的结果)。
这意味着当您等待 task1
和 task2
时,您实际上并不是在等待整个操作,只是等待它的同步部分。
您可以通过在返回的 Task<Task>
:
Task.Unwrap
来解决这个问题
Task<Task> task1 = await Task.Factory.StartNew(async ()=>
{
var records = await DB.ReadAsync("..");
//Do A lot
result1 = Process(records);
});
Task actualTask1 = task1.Unwrap();
await actualTask1;
但是 Task.Run
会暗中为您做到这一点。
作为旁注,您应该意识到您不需要 Task.Run
来同时执行这些操作。您可以通过调用这些方法并与 Task.When
:
async Task MainAsync()
{
var task1 = FooAsync();
var task2 = BarAsync();
await Task.WhenAll(task1, task2);
var result = Combine(task1.Result, task2.Result);
}
async Task<Result> FooAsync()
{
var records = await DB.ReadAsync("..");
//Do A lot
return Process(records);
}
async Task<Result> BarAsync()
{
var records = await DB.ReadAsync(".....");
//Do A lot
return Process(records);
}
如果您甚至需要将这些方法的同步部分(第一个 await
之前的部分)卸载到 ThreadPool
。
Task.Run
Task.Factory.StartNew
开始一个新的Task执行另一个独立的执行单元。因此,最简单的处理方法可能如下所示:
var task1 = Task.Factory.StartNew(()=> //NO AWAIT
{
var records = DB.Read("....."); //NO ASYNC
//Do A lot
result1 = Process(records);
});
... another task definition
Task.WaitAll(task1, task2);
在一项任务中顺序读取和处理,因为您有数据依赖性。
好吧,使用 .WaitAll 不是异步编程,因为您实际上是在等待当前线程。此外,您不调用 .Unwrap,这就是为什么您只等待异步 lambda 的创建,而不是异步 lambda 本身。
Task.Run 可以为您解包异步 lambda。但还有一种更简单、更干净的方法。
var task1 = DB.ReadAsync("..").ContinueWith(task => {
//Do A lot
return Process(task.Result);
}, TaskScheduler.Default);
var task2 = DB.ReadAsync("..").ContinueWith(task => {
//Do A lot
return Process(task.Result);
}, TaskScheduler.Default);
var result = Combine(await task1, await task2);
通过这种方式,您将在准备就绪时准确获得结果。所以你根本不需要额外的任务和变量。
请注意,ContinueWith 是一个棘手的函数,如果它不为空,它会在 TaskScheduler.Current 上运行,否则它会在线程池调度程序 TaskScheduler.Default 上运行。所以在调用这个函数时总是显式指定调度程序更安全。
另外,对于 claryfing,我没有包括错误检查,因为实际上 DB.ReadAsync 可以在错误的情况下完成。但这是一件容易的事,你可以自己处理。