如何等待可枚举的任务并在多个任务完成后停止?
How can I await an enumerable of tasks and stop when a number of tasks have completed?
我有一组任务 运行 相同的作业,但在不同的服务器上使用不同的参数。可能会发生其中一台服务器 unresponsive/slow 导致所有任务都已完成但只有一个的情况。目前我正在使用 Task.WhenAll()
等待他们,所以别无选择,只能等到我的超时到期。
在理想情况下,所有任务都在超时内完成,我可以收集所有结果,但在另一种情况下,基本上我想等待:
- 直到 n 个任务完成
- 另外 x 分钟,如果 n 个任务已完成
如果到 n 个任务已经完成并且我们又等了 x 分钟,并不是所有的任务都完成了,我想检索已完成任务的结果。
有什么方法可以实现上述目标吗?
使用 Task.WhenAny
了解是否有任何任务完成,然后从数组中删除已完成的任务。
stopWatch.Start();
while (arrayoftasks.Any())
{
Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
arrayOfTasks.Remove(finishedTask);
await finishedTask;
finishedCount++;
if (finishedCount == 4) //check you stopwatch elapsed here.
{
Console.WriteLine("4 tasks have finished");
}
}
工作示例:
using System.Diagnostics;
using System.Security.Cryptography;
await Test.Go();
Console.ReadLine();
public static class Test
{
public static async Task Go()
{
List<Task<string>> arrayOfTasks = GetArrayOfTasks();
int finishedCount = 0;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
while (arrayOfTasks.Any())
{
Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
arrayOfTasks.Remove(finishedTask);
Console.WriteLine(await finishedTask);
finishedCount++;
if (finishedCount == 4) //check you stopwatch elapsed here too
{
Console.WriteLine($":::{finishedCount} tasks have finished, {arrayOfTasks.Count} to go");
}
}
}
private static List<Task<string>> GetArrayOfTasks()
{
List<Task<string>> taskList = new();
for (int i = 0; i < 10; i++)
{
var t = GetString(i);
taskList.Add(t);
}
return taskList;
}
private static async Task<string> GetString(int i)
{
await Task.Delay(RandomNumberGenerator.GetInt32(1, 5000));
return i.ToString();
}
}
Rx.Net 是最优雅的实现方式。
public IAsyncEnumerable<TResult> DoStuff<TResult>(IEnumerable<Func<CancellationToken, Task<TResult>>> tasks)
{
var inputs = tasks
// convert this into IObservable<TResult>
// this type, like IAsyncEnumerable, contains
// async logic, and cancellation...
.ToObservable()
.Select(task => Observable.FromAsync(task))
.Merge()
// publish/refcount is needed to ensure
// we only run the tasks once, and share
// the "result/event".
.Publish()
.RefCount();
// On the 100th Item
var timeoutSignal = inputs.Skip(100 - 1)
.Take(1)
// Generate a signal 10 minutes after the 100th
// item arrives
.Delay(TimeSpan.FromMinutes(10));
return inputs
// Take items until the timeout signal
.TakeUntil(timeoutSignal)
.ToAsyncEnumerable();
}
var items = await DoStuff(tasks).ToListAsync()
即使您有复杂的取消逻辑,您也想取消基础任务。如果底层任务在合适的时间被取消,那么无论如何都可以使用Task.WhenAll
。
所以分解你的问题,你要问的是,'How can I cancel tasks based on the state of other tasks?'。您需要保持已完成任务数量的状态,并根据该状态取消您的任务。
如果您需要在任务完成时执行 'stuff'(例如更新已完成任务数的状态),我发现延续很有用并且是一个非常干净的解决方案。您的用例示例:
// n from your question
var n = 4;
// number of tasks currently completed
var tasksCompleted = 0;
// The list of tasks (note it's the continuations in this case)
// You can also keep the continuations and actual tasks in separate lists.
var tasks = new List<Task>();
// delay before cancellation after n tasks completed
var timeAfterNCompleted = TimeSpan.FromMinutes(x);
using var cts = new CancellationTokenSource();
for (int i = 0; i < 10; i++)
{
// Do your work with a passed cancellationtoken you control
var currentTask = DoWorkAsync(i, cts.Token);
// Continuation will update the state of completed tasks
currentTask = currentTask.ContinueWith((t) =>
{
if (t.IsCompletedSuccessfully)
{
var number = Interlocked.Increment(ref tasksCompleted);
if (number == n)
{
// If we passed n tasks completed successfully,
// We'll cancel after the grace period
// Note that this will actually cancel the underlying tasks
// Because we passed the token to the DoWorkAsync method
cts.CancelAfter(timeAfterNCompleted);
}
}
});
tasks.Add(currentTask);
}
await Task.WhenAll(tasks);
// All your tasks have either completed or cancelled here
// Note that in this specific example all tasks will appear
// to have run to completion. That's because we're looking at
// the continuations here. Store continuation and actual task
// in separate lists and you can retrieve the results.
// (Make sure you await the continuations though)
我有一组任务 运行 相同的作业,但在不同的服务器上使用不同的参数。可能会发生其中一台服务器 unresponsive/slow 导致所有任务都已完成但只有一个的情况。目前我正在使用 Task.WhenAll()
等待他们,所以别无选择,只能等到我的超时到期。
在理想情况下,所有任务都在超时内完成,我可以收集所有结果,但在另一种情况下,基本上我想等待:
- 直到 n 个任务完成
- 另外 x 分钟,如果 n 个任务已完成
如果到 n 个任务已经完成并且我们又等了 x 分钟,并不是所有的任务都完成了,我想检索已完成任务的结果。
有什么方法可以实现上述目标吗?
使用 Task.WhenAny
了解是否有任何任务完成,然后从数组中删除已完成的任务。
stopWatch.Start();
while (arrayoftasks.Any())
{
Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
arrayOfTasks.Remove(finishedTask);
await finishedTask;
finishedCount++;
if (finishedCount == 4) //check you stopwatch elapsed here.
{
Console.WriteLine("4 tasks have finished");
}
}
工作示例:
using System.Diagnostics;
using System.Security.Cryptography;
await Test.Go();
Console.ReadLine();
public static class Test
{
public static async Task Go()
{
List<Task<string>> arrayOfTasks = GetArrayOfTasks();
int finishedCount = 0;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
while (arrayOfTasks.Any())
{
Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
arrayOfTasks.Remove(finishedTask);
Console.WriteLine(await finishedTask);
finishedCount++;
if (finishedCount == 4) //check you stopwatch elapsed here too
{
Console.WriteLine($":::{finishedCount} tasks have finished, {arrayOfTasks.Count} to go");
}
}
}
private static List<Task<string>> GetArrayOfTasks()
{
List<Task<string>> taskList = new();
for (int i = 0; i < 10; i++)
{
var t = GetString(i);
taskList.Add(t);
}
return taskList;
}
private static async Task<string> GetString(int i)
{
await Task.Delay(RandomNumberGenerator.GetInt32(1, 5000));
return i.ToString();
}
}
Rx.Net 是最优雅的实现方式。
public IAsyncEnumerable<TResult> DoStuff<TResult>(IEnumerable<Func<CancellationToken, Task<TResult>>> tasks)
{
var inputs = tasks
// convert this into IObservable<TResult>
// this type, like IAsyncEnumerable, contains
// async logic, and cancellation...
.ToObservable()
.Select(task => Observable.FromAsync(task))
.Merge()
// publish/refcount is needed to ensure
// we only run the tasks once, and share
// the "result/event".
.Publish()
.RefCount();
// On the 100th Item
var timeoutSignal = inputs.Skip(100 - 1)
.Take(1)
// Generate a signal 10 minutes after the 100th
// item arrives
.Delay(TimeSpan.FromMinutes(10));
return inputs
// Take items until the timeout signal
.TakeUntil(timeoutSignal)
.ToAsyncEnumerable();
}
var items = await DoStuff(tasks).ToListAsync()
即使您有复杂的取消逻辑,您也想取消基础任务。如果底层任务在合适的时间被取消,那么无论如何都可以使用Task.WhenAll
。
所以分解你的问题,你要问的是,'How can I cancel tasks based on the state of other tasks?'。您需要保持已完成任务数量的状态,并根据该状态取消您的任务。
如果您需要在任务完成时执行 'stuff'(例如更新已完成任务数的状态),我发现延续很有用并且是一个非常干净的解决方案。您的用例示例:
// n from your question
var n = 4;
// number of tasks currently completed
var tasksCompleted = 0;
// The list of tasks (note it's the continuations in this case)
// You can also keep the continuations and actual tasks in separate lists.
var tasks = new List<Task>();
// delay before cancellation after n tasks completed
var timeAfterNCompleted = TimeSpan.FromMinutes(x);
using var cts = new CancellationTokenSource();
for (int i = 0; i < 10; i++)
{
// Do your work with a passed cancellationtoken you control
var currentTask = DoWorkAsync(i, cts.Token);
// Continuation will update the state of completed tasks
currentTask = currentTask.ContinueWith((t) =>
{
if (t.IsCompletedSuccessfully)
{
var number = Interlocked.Increment(ref tasksCompleted);
if (number == n)
{
// If we passed n tasks completed successfully,
// We'll cancel after the grace period
// Note that this will actually cancel the underlying tasks
// Because we passed the token to the DoWorkAsync method
cts.CancelAfter(timeAfterNCompleted);
}
}
});
tasks.Add(currentTask);
}
await Task.WhenAll(tasks);
// All your tasks have either completed or cancelled here
// Note that in this specific example all tasks will appear
// to have run to completion. That's because we're looking at
// the continuations here. Store continuation and actual task
// in separate lists and you can retrieve the results.
// (Make sure you await the continuations though)