我可以在后台 运行 多个慢进程,以便多个任务可以 运行 并行吗?

Can I run multiple slow processes in the background so more than one task can run in parallel?

我在 Core .NET 2.2 框架的顶部使用 C# 编写了一个控制台应用程序。

我的应用程序允许我使用 Windows 任务调度程序触发长 运行 管理作业。

其中一项管理作业进行了 web-API 调用,该调用在将文件上传到 Azure Blob 存储之前下载了大量文件。以下是我的代码完成工作所需执行的逻辑步骤

  1. 调用远程 API 以 Mime 消息响应,其中每条消息代表一个文件。
  2. 解析 Mime 消息并将每条消息转换为 MemoryStream 创建 MemoryStream 集合

一旦我有一个包含多个 1000+ MemoryStream 的集合,我想将每个 Stream 写入 Azure Blob 存储。由于写入远程存储很慢,我希望我可以使用自己的进程或线程执行每个写入迭代。这将允许我同时并行地拥有 1000 多个线程 运行,而不必等待每个写入操作的结果。每个线程将负责记录在 write/upload 过程中可能发生的任何错误。任何记录的错误都将使用不同的作业处理,因此我不必担心重试。

我的理解是调用 writes/upload 流异步执行的代码。换句话说,我会说 "there is a Stream execute it and run for as long as it takes. I don't really care about the result as long as the task gets completed."

测试时发现自己对调用async的理解有些不对。我的印象是,当调用用 async 定义的方法时,将在后台执行 thread/worker 直到该过程完成。但是,当我测试代码时,我的理解失败了。我的代码告诉我,如果不添加关键字 awaitasync 代码就不会真正执行。同时,当加上关键字await时,代码会一直等到进程执行完毕后才继续执行。换句话说,根据我的需要添加await会破坏异步调用方法的目的。

这是我的代码的精简版本,用于解释我要完成的工作

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};

    foreach (Stream file in files)
    {
        // This code should get executed in the background without having to await the result
        await Upload(file);
    }
}

// This method is responsible of upload a stream to a storage and log error if any
private async Task Upload(Stream stream)
{
    try
    {
        await Storage.Create(file, GetUniqueName());
    } 
    catch(Exception e)
    {
        // Log any errors
    }
}

从上面的代码中,调用 await Upload(file); 可以正常工作,并且会按预期上传文件。但是,由于我在调用 Upload() 方法时使用 await,因此在上传代码完成之前,我的循环不会跳转到下一次迭代。同时,删除 await 关键字,循环不会等待上传过程,但 Stream 从未实际写入存储,就好像我从未调用过代码一样。

如何并行执行多个 Upload 方法,以便后台每次上传有一个线程 运行?

将列表转换为 "Upload" 个任务的列表,并用 Task.WhenAll():

等待它们
public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};
    var tasks = files.Select(Upload);

    await Task.WhenAll(tasks);
}

有关 tasks/await 的更多信息,请参阅

您可能需要这个:

var tasks = files.Select(Upload);
await Task.WhenAll(tasks);

请注意,它会生成与您拥有的文件一样多的任务,如果它们太多,可能会导致 process/machine 崩溃。请参阅 Have a set of Tasks with only X running at a time 作为如何解决该问题的示例。

I am hoping that I can execute each write iteration using its own process or thread.

这并不是最好的方法。进程和线程是有限的资源。您的限制因素 正在等待网络执行操作。

您要做的就是:

var tasks = new List<Task>(queue.Count);

while (queue.Count > 0)
{
  var myobject = Queue.Dequeue();
  var task = blockBlob.UploadFromByteArrayAsync(myobject.content, 0, myobject.content.Length);
  tasks.Add(task);
}
await Task.WhenAll(tasks);

这里我们只是尽可能快地创建任务,然后等待它们全部完成。我们将让 .Net 框架处理剩下的事情。

这里重要的是线程不会提高等待网络资源的速度。任务是一种将需要完成的工作委派给线程的方式,因此您可以有更多的线程来做任何事情(比如开始新的上传,或响应已完成的上传)。如果线程只是等待上传完成,那就是浪费资源。

其他答案都很好,但是另一种方法是 TPL DataFlow 可在 https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/

的 Nuget 中使用
public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   //  Do async work here
}

dataflow的优势

  1. 它是否自然地与 async 一起工作,WhenAll 基于任务的解决方案
  2. 它也可以连接到更大的任务管道中
    • 您可以通过管道将错误重新输入来重试错误。
    • 将任何预处理调用添加到较早的块中
  3. 如果节流是一个问题,您可以限制 MaxDegreeOfParallelism
  4. 您可以制作更复杂的管道,因此得名 DataFlow

您可以将您的代码转换为 Azure Function 并让它让 Azure 处理大部分并行性、横向扩展和上传到 Azure Blob 存储工作。

您可以使用 Http 触发器或服务总线触发器来启动每个下载、处理和上传任务。