如何限制多个异步任务?
How to throttle multiple asynchronous tasks?
我有一些如下形式的代码:
static async Task DoSomething(int n)
{
...
}
static void RunThreads(int totalThreads, int throttle)
{
var tasks = new List<Task>();
for (var n = 0; n < totalThreads; n++)
{
var task = DoSomething(n);
tasks.Add(task);
}
Task.WhenAll(tasks).Wait(); // all threads must complete
}
问题是,如果我不限制线程,事情就会开始崩溃。现在,我想启动最多 throttle
个线程,并且只在旧线程完成后才启动新线程。我已经尝试了一些方法并且 none 到目前为止已经奏效了。我遇到的问题包括:
tasks
集合必须完全填充所有任务,无论是活动的还是等待执行的,否则最终的 .Wait()
调用只会查看它开始的线程。
- 链接执行似乎需要使用
Task.Run()
等。但是我需要从一开始就引用每个任务,并且实例化一个任务似乎会自动启动它,这是我不想要的。
如何操作?
如果我理解正确,您可以启动 throttle
参数中提到的有限数量的任务,并等待它们完成,然后再开始下一步..
要在开始新任务之前等待所有已启动的任务完成,请使用以下实现。
static async Task RunThreads(int totalThreads, int throttle)
{
var tasks = new List<Task>();
for (var n = 0; n < totalThreads; n++)
{
var task = DoSomething(n);
tasks.Add(task);
if (tasks.Count == throttle)
{
await Task.WhenAll(tasks);
tasks.Clear();
}
}
await Task.WhenAll(tasks); // wait for remaining
}
要在完成时添加任务,您可以使用以下代码
static async Task RunThreads(int totalThreads, int throttle)
{
var tasks = new List<Task>();
for (var n = 0; n < totalThreads; n++)
{
var task = DoSomething(n);
tasks.Add(task);
if (tasks.Count == throttle)
{
var completed = await Task.WhenAny(tasks);
tasks.Remove(completed);
}
}
await Task.WhenAll(tasks); // all threads must complete
}
Stephen Toub 在他的 The Task-based Asynchronous Pattern 文档中给出了以下节流示例。
const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch(Exception exc) { Log(exc); }
if (nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
}
Microsoft 的 Reactive Extensions (Rx) - NuGet "Rx-Main" - 很好地解决了这个问题。
只需这样做:
static void RunThreads(int totalThreads, int throttle)
{
Observable
.Range(0, totalThreads)
.Select(n => Observable.FromAsync(() => DoSomething(n)))
.Merge(throttle)
.Wait();
}
任务完成。
IMO 最简单的选择是使用 TPL 数据流。您只需创建一个 ActionBLock
,通过所需的并行度限制它并开始将项目发布到其中。它确保同时只运行一定数量的任务,当一个任务完成时,它开始执行下一个项目:
async Task RunAsync(int totalThreads, int throttle)
{
var block = new ActionBlock<int>(
DoSomething,
new ExecutionDataFlowOptions { MaxDegreeOfParallelism = throttle });
for (var n = 0; n < totalThreads; n++)
{
block.Post(n);
}
block.Complete();
await block.Completion;
}
这里有一些基于 Sriram Sakthivel 答案的扩展方法变体。
在用法示例中,对 DoSomething
的调用被包装在一个显式转换闭包中以允许传递参数。
public static async Task RunMyThrottledTasks()
{
var myArgsSource = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
await myArgsSource
.Select(a => (Func<Task<object>>)(() => DoSomething(a)))
.Throttle(2);
}
public static async Task<object> DoSomething(int arg)
{
// Await some async calls that need arg..
// ..then return result async Task..
return new object();
}
public static async Task<IEnumerable<T>> Throttle<T>(IEnumerable<Func<Task<T>>> toRun, int throttleTo)
{
var running = new List<Task<T>>(throttleTo);
var completed = new List<Task<T>>(toRun.Count());
foreach(var taskToRun in toRun)
{
running.Add(taskToRun());
if(running.Count == throttleTo)
{
var comTask = await Task.WhenAny(running);
running.Remove(comTask);
completed.Add(comTask);
}
}
return completed.Select(t => t.Result);
}
public static async Task Throttle(this IEnumerable<Func<Task>> toRun, int throttleTo)
{
var running = new List<Task>(throttleTo);
foreach(var taskToRun in toRun)
{
running.Add(taskToRun());
if(running.Count == throttleTo)
{
var comTask = await Task.WhenAny(running);
running.Remove(comTask);
}
}
}
首先,从线程中抽象出来。特别是因为您的操作是异步的,所以您根本不应该考虑 "threads" 。在异步世界中,您有任务,与线程相比,您可以拥有 巨大 数量的任务。
可以使用 SemaphoreSlim
:
来限制异步代码
static async Task DoSomething(int n);
static void RunConcurrently(int total, int throttle)
{
var mutex = new SemaphoreSlim(throttle);
var tasks = Enumerable.Range(0, total).Select(async item =>
{
await mutex.WaitAsync();
try { await DoSomething(item); }
finally { mutex.Release(); }
});
Task.WhenAll(tasks).Wait();
}
您需要的是自定义任务计划程序。您可以从 System.Threading.Tasks.TaskScheduler 派生一个 class 并实现两个主要函数 GetScheduledTasks()
、QueueTask()
以及其他函数来获得对节流任务的完全控制。这是一个有据可查的示例。
https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0
您实际上可以模拟作为 .NET 6 的一部分引入的 Parallel.ForEachAsync
方法。为了模拟相同的方法,您可以使用以下代码。
public static Task ForEachAsync<T>(IEnumerable<T> source, int maxDegreeOfParallelism, Func<T, Task> body) {
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(maxDegreeOfParallelism)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
.NET 6 引入了 Parallel.ForEachAsync
。您可以这样重写代码:
static async ValueTask DoSomething(int n)
{
...
}
static Task RunThreads(int totalThreads, int throttle)
=> Parallel.ForEachAsync(Enumerable.Range(0, totalThreads), new ParallelOptions() { MaxDegreeOfParallelism = throttle }, (i, _) => DoSomething(i));
备注:
- 我不得不将您的
DoSomething
函数的 return 类型从 Task
更改为 ValueTask
。
- 您可能想避免
.Wait()
调用,所以我将 RunThreads
方法设为异步。
- 从您的示例中看不出为什么您需要访问各个任务。此代码不会让您访问任务,但在许多情况下可能仍然有用。
我有一些如下形式的代码:
static async Task DoSomething(int n)
{
...
}
static void RunThreads(int totalThreads, int throttle)
{
var tasks = new List<Task>();
for (var n = 0; n < totalThreads; n++)
{
var task = DoSomething(n);
tasks.Add(task);
}
Task.WhenAll(tasks).Wait(); // all threads must complete
}
问题是,如果我不限制线程,事情就会开始崩溃。现在,我想启动最多 throttle
个线程,并且只在旧线程完成后才启动新线程。我已经尝试了一些方法并且 none 到目前为止已经奏效了。我遇到的问题包括:
tasks
集合必须完全填充所有任务,无论是活动的还是等待执行的,否则最终的.Wait()
调用只会查看它开始的线程。- 链接执行似乎需要使用
Task.Run()
等。但是我需要从一开始就引用每个任务,并且实例化一个任务似乎会自动启动它,这是我不想要的。
如何操作?
如果我理解正确,您可以启动 throttle
参数中提到的有限数量的任务,并等待它们完成,然后再开始下一步..
要在开始新任务之前等待所有已启动的任务完成,请使用以下实现。
static async Task RunThreads(int totalThreads, int throttle)
{
var tasks = new List<Task>();
for (var n = 0; n < totalThreads; n++)
{
var task = DoSomething(n);
tasks.Add(task);
if (tasks.Count == throttle)
{
await Task.WhenAll(tasks);
tasks.Clear();
}
}
await Task.WhenAll(tasks); // wait for remaining
}
要在完成时添加任务,您可以使用以下代码
static async Task RunThreads(int totalThreads, int throttle)
{
var tasks = new List<Task>();
for (var n = 0; n < totalThreads; n++)
{
var task = DoSomething(n);
tasks.Add(task);
if (tasks.Count == throttle)
{
var completed = await Task.WhenAny(tasks);
tasks.Remove(completed);
}
}
await Task.WhenAll(tasks); // all threads must complete
}
Stephen Toub 在他的 The Task-based Asynchronous Pattern 文档中给出了以下节流示例。
const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch(Exception exc) { Log(exc); }
if (nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
}
Microsoft 的 Reactive Extensions (Rx) - NuGet "Rx-Main" - 很好地解决了这个问题。
只需这样做:
static void RunThreads(int totalThreads, int throttle)
{
Observable
.Range(0, totalThreads)
.Select(n => Observable.FromAsync(() => DoSomething(n)))
.Merge(throttle)
.Wait();
}
任务完成。
IMO 最简单的选择是使用 TPL 数据流。您只需创建一个 ActionBLock
,通过所需的并行度限制它并开始将项目发布到其中。它确保同时只运行一定数量的任务,当一个任务完成时,它开始执行下一个项目:
async Task RunAsync(int totalThreads, int throttle)
{
var block = new ActionBlock<int>(
DoSomething,
new ExecutionDataFlowOptions { MaxDegreeOfParallelism = throttle });
for (var n = 0; n < totalThreads; n++)
{
block.Post(n);
}
block.Complete();
await block.Completion;
}
这里有一些基于 Sriram Sakthivel 答案的扩展方法变体。
在用法示例中,对 DoSomething
的调用被包装在一个显式转换闭包中以允许传递参数。
public static async Task RunMyThrottledTasks()
{
var myArgsSource = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
await myArgsSource
.Select(a => (Func<Task<object>>)(() => DoSomething(a)))
.Throttle(2);
}
public static async Task<object> DoSomething(int arg)
{
// Await some async calls that need arg..
// ..then return result async Task..
return new object();
}
public static async Task<IEnumerable<T>> Throttle<T>(IEnumerable<Func<Task<T>>> toRun, int throttleTo)
{
var running = new List<Task<T>>(throttleTo);
var completed = new List<Task<T>>(toRun.Count());
foreach(var taskToRun in toRun)
{
running.Add(taskToRun());
if(running.Count == throttleTo)
{
var comTask = await Task.WhenAny(running);
running.Remove(comTask);
completed.Add(comTask);
}
}
return completed.Select(t => t.Result);
}
public static async Task Throttle(this IEnumerable<Func<Task>> toRun, int throttleTo)
{
var running = new List<Task>(throttleTo);
foreach(var taskToRun in toRun)
{
running.Add(taskToRun());
if(running.Count == throttleTo)
{
var comTask = await Task.WhenAny(running);
running.Remove(comTask);
}
}
}
首先,从线程中抽象出来。特别是因为您的操作是异步的,所以您根本不应该考虑 "threads" 。在异步世界中,您有任务,与线程相比,您可以拥有 巨大 数量的任务。
可以使用 SemaphoreSlim
:
static async Task DoSomething(int n);
static void RunConcurrently(int total, int throttle)
{
var mutex = new SemaphoreSlim(throttle);
var tasks = Enumerable.Range(0, total).Select(async item =>
{
await mutex.WaitAsync();
try { await DoSomething(item); }
finally { mutex.Release(); }
});
Task.WhenAll(tasks).Wait();
}
您需要的是自定义任务计划程序。您可以从 System.Threading.Tasks.TaskScheduler 派生一个 class 并实现两个主要函数 GetScheduledTasks()
、QueueTask()
以及其他函数来获得对节流任务的完全控制。这是一个有据可查的示例。
https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0
您实际上可以模拟作为 .NET 6 的一部分引入的 Parallel.ForEachAsync
方法。为了模拟相同的方法,您可以使用以下代码。
public static Task ForEachAsync<T>(IEnumerable<T> source, int maxDegreeOfParallelism, Func<T, Task> body) {
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(maxDegreeOfParallelism)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
.NET 6 引入了 Parallel.ForEachAsync
。您可以这样重写代码:
static async ValueTask DoSomething(int n)
{
...
}
static Task RunThreads(int totalThreads, int throttle)
=> Parallel.ForEachAsync(Enumerable.Range(0, totalThreads), new ParallelOptions() { MaxDegreeOfParallelism = throttle }, (i, _) => DoSomething(i));
备注:
- 我不得不将您的
DoSomething
函数的 return 类型从Task
更改为ValueTask
。 - 您可能想避免
.Wait()
调用,所以我将RunThreads
方法设为异步。 - 从您的示例中看不出为什么您需要访问各个任务。此代码不会让您访问任务,但在许多情况下可能仍然有用。