我可以在后台 运行 多个慢进程,以便多个任务可以 运行 并行吗?
Can I run multiple slow processes in the background so more than one task can run in parallel?
我在 Core .NET 2.2 框架的顶部使用 C#
编写了一个控制台应用程序。
我的应用程序允许我使用 Windows 任务调度程序触发长 运行 管理作业。
其中一项管理作业进行了 web-API 调用,该调用在将文件上传到 Azure Blob 存储之前下载了大量文件。以下是我的代码完成工作所需执行的逻辑步骤
- 调用远程 API 以 Mime 消息响应,其中每条消息代表一个文件。
- 解析 Mime 消息并将每条消息转换为
MemoryStream
创建 MemoryStream 集合
一旦我有一个包含多个 1000+ MemoryStream
的集合,我想将每个 Stream
写入 Azure Blob 存储。由于写入远程存储很慢,我希望我可以使用自己的进程或线程执行每个写入迭代。这将允许我同时并行地拥有 1000 多个线程 运行,而不必等待每个写入操作的结果。每个线程将负责记录在 write/upload 过程中可能发生的任何错误。任何记录的错误都将使用不同的作业处理,因此我不必担心重试。
我的理解是调用 writes/upload 流异步执行的代码。换句话说,我会说 "there is a Stream
execute it and run for as long as it takes. I don't really care about the result as long as the task gets completed."
测试时发现自己对调用async
的理解有些不对。我的印象是,当调用用 async
定义的方法时,将在后台执行 thread/worker 直到该过程完成。但是,当我测试代码时,我的理解失败了。我的代码告诉我,如果不添加关键字 await
,async
代码就不会真正执行。同时,当加上关键字await
时,代码会一直等到进程执行完毕后才继续执行。换句话说,根据我的需要添加await
会破坏异步调用方法的目的。
这是我的代码的精简版本,用于解释我要完成的工作
public async Task Run()
{
// This gets populated after calling the web-API and parsing out the result
List<Stream> files = new List<MemoryStream>{.....};
foreach (Stream file in files)
{
// This code should get executed in the background without having to await the result
await Upload(file);
}
}
// This method is responsible of upload a stream to a storage and log error if any
private async Task Upload(Stream stream)
{
try
{
await Storage.Create(file, GetUniqueName());
}
catch(Exception e)
{
// Log any errors
}
}
从上面的代码中,调用 await Upload(file);
可以正常工作,并且会按预期上传文件。但是,由于我在调用 Upload()
方法时使用 await
,因此在上传代码完成之前,我的循环不会跳转到下一次迭代。同时,删除 await
关键字,循环不会等待上传过程,但 Stream 从未实际写入存储,就好像我从未调用过代码一样。
如何并行执行多个 Upload
方法,以便后台每次上传有一个线程 运行?
将列表转换为 "Upload" 个任务的列表,并用 Task.WhenAll()
:
等待它们
public async Task Run()
{
// This gets populated after calling the web-API and parsing out the result
List<Stream> files = new List<MemoryStream>{.....};
var tasks = files.Select(Upload);
await Task.WhenAll(tasks);
}
有关 tasks/await 的更多信息,请参阅 。
您可能需要这个:
var tasks = files.Select(Upload);
await Task.WhenAll(tasks);
请注意,它会生成与您拥有的文件一样多的任务,如果它们太多,可能会导致 process/machine 崩溃。请参阅 Have a set of Tasks with only X running at a time 作为如何解决该问题的示例。
I am hoping that I can execute each write iteration using its own process or thread.
这并不是最好的方法。进程和线程是有限的资源。您的限制因素 正在等待网络执行操作。
您要做的就是:
var tasks = new List<Task>(queue.Count);
while (queue.Count > 0)
{
var myobject = Queue.Dequeue();
var task = blockBlob.UploadFromByteArrayAsync(myobject.content, 0, myobject.content.Length);
tasks.Add(task);
}
await Task.WhenAll(tasks);
这里我们只是尽可能快地创建任务,然后等待它们全部完成。我们将让 .Net 框架处理剩下的事情。
这里重要的是线程不会提高等待网络资源的速度。任务是一种将需要完成的工作委派给线程的方式,因此您可以有更多的线程来做任何事情(比如开始新的上传,或响应已完成的上传)。如果线程只是等待上传完成,那就是浪费资源。
其他答案都很好,但是另一种方法是 TPL DataFlow 可在 https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/
的 Nuget 中使用
public static async Task DoWorkLoads(List<Something> results)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 50
};
var block = new ActionBlock<Something>(MyMethodAsync, options);
foreach (var result in results)
block.Post(result );
block.Complete();
await block.Completion;
}
...
public async Task MyMethodAsync(Something result)
{
// Do async work here
}
dataflow的优势
- 它是否自然地与
async
一起工作,WhenAll
基于任务的解决方案
- 它也可以连接到更大的任务管道中
- 您可以通过管道将错误重新输入来重试错误。
- 将任何预处理调用添加到较早的块中
- 如果节流是一个问题,您可以限制
MaxDegreeOfParallelism
- 您可以制作更复杂的管道,因此得名 DataFlow
您可以将您的代码转换为 Azure Function 并让它让 Azure 处理大部分并行性、横向扩展和上传到 Azure Blob 存储工作。
您可以使用 Http 触发器或服务总线触发器来启动每个下载、处理和上传任务。
我在 Core .NET 2.2 框架的顶部使用 C#
编写了一个控制台应用程序。
我的应用程序允许我使用 Windows 任务调度程序触发长 运行 管理作业。
其中一项管理作业进行了 web-API 调用,该调用在将文件上传到 Azure Blob 存储之前下载了大量文件。以下是我的代码完成工作所需执行的逻辑步骤
- 调用远程 API 以 Mime 消息响应,其中每条消息代表一个文件。
- 解析 Mime 消息并将每条消息转换为
MemoryStream
创建 MemoryStream 集合
一旦我有一个包含多个 1000+ MemoryStream
的集合,我想将每个 Stream
写入 Azure Blob 存储。由于写入远程存储很慢,我希望我可以使用自己的进程或线程执行每个写入迭代。这将允许我同时并行地拥有 1000 多个线程 运行,而不必等待每个写入操作的结果。每个线程将负责记录在 write/upload 过程中可能发生的任何错误。任何记录的错误都将使用不同的作业处理,因此我不必担心重试。
我的理解是调用 writes/upload 流异步执行的代码。换句话说,我会说 "there is a Stream
execute it and run for as long as it takes. I don't really care about the result as long as the task gets completed."
测试时发现自己对调用async
的理解有些不对。我的印象是,当调用用 async
定义的方法时,将在后台执行 thread/worker 直到该过程完成。但是,当我测试代码时,我的理解失败了。我的代码告诉我,如果不添加关键字 await
,async
代码就不会真正执行。同时,当加上关键字await
时,代码会一直等到进程执行完毕后才继续执行。换句话说,根据我的需要添加await
会破坏异步调用方法的目的。
这是我的代码的精简版本,用于解释我要完成的工作
public async Task Run()
{
// This gets populated after calling the web-API and parsing out the result
List<Stream> files = new List<MemoryStream>{.....};
foreach (Stream file in files)
{
// This code should get executed in the background without having to await the result
await Upload(file);
}
}
// This method is responsible of upload a stream to a storage and log error if any
private async Task Upload(Stream stream)
{
try
{
await Storage.Create(file, GetUniqueName());
}
catch(Exception e)
{
// Log any errors
}
}
从上面的代码中,调用 await Upload(file);
可以正常工作,并且会按预期上传文件。但是,由于我在调用 Upload()
方法时使用 await
,因此在上传代码完成之前,我的循环不会跳转到下一次迭代。同时,删除 await
关键字,循环不会等待上传过程,但 Stream 从未实际写入存储,就好像我从未调用过代码一样。
如何并行执行多个 Upload
方法,以便后台每次上传有一个线程 运行?
将列表转换为 "Upload" 个任务的列表,并用 Task.WhenAll()
:
public async Task Run()
{
// This gets populated after calling the web-API and parsing out the result
List<Stream> files = new List<MemoryStream>{.....};
var tasks = files.Select(Upload);
await Task.WhenAll(tasks);
}
有关 tasks/await 的更多信息,请参阅
您可能需要这个:
var tasks = files.Select(Upload);
await Task.WhenAll(tasks);
请注意,它会生成与您拥有的文件一样多的任务,如果它们太多,可能会导致 process/machine 崩溃。请参阅 Have a set of Tasks with only X running at a time 作为如何解决该问题的示例。
I am hoping that I can execute each write iteration using its own process or thread.
这并不是最好的方法。进程和线程是有限的资源。您的限制因素 正在等待网络执行操作。
您要做的就是:
var tasks = new List<Task>(queue.Count);
while (queue.Count > 0)
{
var myobject = Queue.Dequeue();
var task = blockBlob.UploadFromByteArrayAsync(myobject.content, 0, myobject.content.Length);
tasks.Add(task);
}
await Task.WhenAll(tasks);
这里我们只是尽可能快地创建任务,然后等待它们全部完成。我们将让 .Net 框架处理剩下的事情。
这里重要的是线程不会提高等待网络资源的速度。任务是一种将需要完成的工作委派给线程的方式,因此您可以有更多的线程来做任何事情(比如开始新的上传,或响应已完成的上传)。如果线程只是等待上传完成,那就是浪费资源。
其他答案都很好,但是另一种方法是 TPL DataFlow 可在 https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/
的 Nuget 中使用public static async Task DoWorkLoads(List<Something> results)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 50
};
var block = new ActionBlock<Something>(MyMethodAsync, options);
foreach (var result in results)
block.Post(result );
block.Complete();
await block.Completion;
}
...
public async Task MyMethodAsync(Something result)
{
// Do async work here
}
dataflow的优势
- 它是否自然地与
async
一起工作,WhenAll
基于任务的解决方案 - 它也可以连接到更大的任务管道中
- 您可以通过管道将错误重新输入来重试错误。
- 将任何预处理调用添加到较早的块中
- 如果节流是一个问题,您可以限制
MaxDegreeOfParallelism
- 您可以制作更复杂的管道,因此得名 DataFlow
您可以将您的代码转换为 Azure Function 并让它让 Azure 处理大部分并行性、横向扩展和上传到 Azure Blob 存储工作。
您可以使用 Http 触发器或服务总线触发器来启动每个下载、处理和上传任务。