要求按顺序 运行 等待子任务的并发处理(在控制台应用程序中)

Concurrent processing with the requirement of sequentially run awaited sub-tasks (in a console app)

我正在尝试找到处理并发处理项目的最佳方法,这些项目由需要按顺序执行的单个子任务组成 (C# / .net 4.6)

即并发处理列表中的对象,但按顺序执行一系列(异步等待)子任务 - 并且仅 运行 所有操作完成后出现的代码。

伪代码:

public async Task SynchronizeItems() 
{
   List<Items> items = await client.RetrieveItems();
   foreach (var item in collection) // but in parallel
   {
      await item.DoThingA()
      await item.DoThingB()
      await item.DoThingC()
   }   
   // **only run code here when all sub tasks for all items are complete**
}

更大的上下文(简化形式):我有一个可执行文件需要每 N 分钟 运行 作为计划任务。 Program的entrypoint/Main方法初始化一个ItemSyncService并调用SynchronizeItems()。因为 SynchronizeItems() 是异步的,所以当遇到第一个 await 时整个过程立即退出,因为控制返回到 Main 方法。

(简单地添加调用 SynchronizeItems().Wait() 不起作用,因为这是一个简化的场景。实际上,调用层次结构非常复杂,动态加载程序集和调用方法等.)

为了防止这种情况(基于我在 Stack Overflow 上看到的 post),我添加了一个 ManualResetEvent,这样我就可以手动控制何时 "all tasks are complete"。

static void Main(string[] args)
{
   ManualResetEvent completionEvent = new ManualResetEvent(false);
   _executor = new ItemService();
   _executor.SynchronizeItems(completionEvent)

   // wait for completion events to be set before exiting the method
   completionEvent.WaitOne()
}

同步方法如下所示:

public async Task SynchronizeItems(ManualResetEvent completionEvent) 
{
   List<Items> items = await client.RetrieveItems();
   foreach (var item in collection) // but in parallel
   {
      await item.DoThingA()
      await item.DoThingB()
      await item.DoThingC()
   }   
   // ** only run code here when all sub tasks for all items are complete**
   // signal completion
   completionEvent.Set()
}

将其更改为使用 Parallel ForEach 进行项目级并发,如下所示:

public async Task SynchronizeItems(ManualResetEvent completionEvent) 
{
   List<Items> items = await client.RetrieveItems();
   Parallel.ForEach(items, async (item) => // in parallel now
   {
      await item.DoThingA()
      await item.DoThingB()
      await item.DoThingC()
   }   
   **// only run code here when all sub tasks for all items are complete **
   // signal completion
   completionEvent.Set() // ** this now runs immediately without waiting **
}

但是,完成此操作后,将在开始每个项目的任务后立即调用 completionEvent。

我找到了一个实现 ParallelForEachAsync (https://github.com/Dasync/AsyncEnumerable) 的第 3 方库。看来这将阻止在所有项目的所有等待任务完成之前设置 completionEvent。

但我想知道我是否做错了? 最初当我写这篇文章时,我正在作为控制台应用程序进行测试并且在交互模式下有一个 ReadKey(),所以我没有 运行 进入异步相关的进程退出问题。

如果 DoThings 方法发出 I/O 请求而不是 CPU 绑定操作,那么您可以将项目处理移至它自己的方法:

private async Task ProcessItem(Items item) {
    await item.DoThingA();
    await item.DoThingB();
    await item.DoThingC();
}

然后为每个 Task 个对象构建一个列表,然后等待它们全部完成:

var taskList = new List<Task>();
foreach (var item in collection)
{
    taskList.Add(ProcessItem(item));
}
await Task.WhenAll(taskList);
// you will get here only when all the items are processed

await作用于一个不完整的Task时,它return自己的不完整Task和执行return调用方法。因此,如果 DoThingA() 发出网络请求,例如,一旦发送该网络请求,执行 returns 返回到 SynchronizeItems 方法并开始列表中的下一个.

所以这样做会立即开始所有的事情,然后当回复回来时,事情就结束了。 "finishing up" 可能会或可能不会发生在单独的线程上。这取决于应用程序的类型。

  • 在没有同步上下文的应用程序中(ASP.NET 核心,或控制台应用程序,或 Windows 服务)然后每个都将在后台线程上完成。

  • 如果此应用程序确实有同步上下文(ASP.NET 或桌面应用程序),那么每个应用程序都会等到主线程空闲后再完成。如果您知道您不需要上下文(例如,在 ASP.NET 中,您没有在其中任何一个或 UI 应用程序中使用 HttpContext,则您没有更改UI),那么你可以使用 ConfigureAwait(false) 告诉它你不需要 return 到它开始的相同上下文,它会更快完成:

private async Task ProcessItem(Items item) {
    await item.DoThingA().ConfigureAwait(false);
    await item.DoThingB().ConfigureAwait(false);
    await item.DoThingC().ConfigureAwait(false);
}

Microsoft 在 Asynchronous programming with async and await 上有一系列写得很好的文章。查看左侧的 table 内容,了解该部分的其余文章。