如何等待可枚举的任务并在多个任务完成后停止?

How can I await an enumerable of tasks and stop when a number of tasks have completed?

我有一组任务 运行 相同的作业,但在不同的服务器上使用不同的参数。可能会发生其中一台服务器 unresponsive/slow 导致所有任务都已完成但只有一个的情况。目前我正在使用 Task.WhenAll() 等待他们,所以别无选择,只能等到我的超时到期。

在理想情况下,所有任务都在超时内完成,我可以收集所有结果,但在另一种情况下,基本上我想等待:

如果到 n 个任务已经完成并且我们又等了 x 分钟,并不是所有的任务都完成了,我想检索已完成任务的结果。

有什么方法可以实现上述目标吗?

使用 Task.WhenAny 了解是否有任何任务完成,然后从数组中删除已完成的任务。

stopWatch.Start();
while (arrayoftasks.Any())
{
    Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
    arrayOfTasks.Remove(finishedTask);
    await finishedTask;
    finishedCount++;
    if (finishedCount == 4) //check you stopwatch elapsed here.
    {
        Console.WriteLine("4 tasks have finished");
    }
}

工作示例:

using System.Diagnostics;
using System.Security.Cryptography;

await Test.Go();
Console.ReadLine();
public static class Test
{
    public static async Task Go()
    {
        List<Task<string>> arrayOfTasks = GetArrayOfTasks();
        int finishedCount = 0;
        Stopwatch stopWatch = new Stopwatch();
        stopWatch.Start();
        while (arrayOfTasks.Any())
        {
            Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
            arrayOfTasks.Remove(finishedTask);
            Console.WriteLine(await finishedTask);
            finishedCount++;
            if (finishedCount == 4) //check you stopwatch elapsed here too
            {
                Console.WriteLine($":::{finishedCount} tasks have finished, {arrayOfTasks.Count} to go");
            }
        }
    }

    private static List<Task<string>> GetArrayOfTasks()
    {
        List<Task<string>> taskList = new();
        for (int i = 0; i < 10; i++)
        {
            var t = GetString(i);
            taskList.Add(t);
        }
        return taskList;
    }

    private static async Task<string> GetString(int i)
    {
        await Task.Delay(RandomNumberGenerator.GetInt32(1, 5000));
        return i.ToString();
    }
}   

Rx.Net 是最优雅的实现方式。

public IAsyncEnumerable<TResult> DoStuff<TResult>(IEnumerable<Func<CancellationToken, Task<TResult>>> tasks)
{
    var inputs = tasks
            // convert this into IObservable<TResult>
            // this type, like IAsyncEnumerable, contains
            // async logic, and cancellation...
            .ToObservable()
            .Select(task => Observable.FromAsync(task))
            .Merge()
            // publish/refcount is needed to ensure
            // we only run the tasks once, and share
            // the "result/event".
            .Publish()
            .RefCount();
                         // On the 100th Item
    var timeoutSignal = inputs.Skip(100 - 1)
                          .Take(1)
                          // Generate a signal 10 minutes after the 100th 
                          // item arrives
                          .Delay(TimeSpan.FromMinutes(10));
    return inputs
            // Take items until the timeout signal
            .TakeUntil(timeoutSignal)
            .ToAsyncEnumerable();
    
}

var items = await DoStuff(tasks).ToListAsync()

即使您有复杂的取消逻辑,您也想取消基础任务。如果底层任务在合适的时间被取消,那么无论如何都可以使用Task.WhenAll

所以分解你的问题,你要问的是,'How can I cancel tasks based on the state of other tasks?'。您需要保持已完成任务数量的状态,并根据该状态取消您的任务。

如果您需要在任务完成时执行 'stuff'(例如更新已完成任务数的状态),我发现延续很有用并且是一个非常干净的解决方案。您的用例示例:

// n from your question
var n = 4; 

// number of tasks currently completed
var tasksCompleted = 0; 

// The list of tasks (note it's the continuations in this case)
// You can also keep the continuations and actual tasks in separate lists.
var tasks = new List<Task>();

// delay before cancellation after n tasks completed
var timeAfterNCompleted = TimeSpan.FromMinutes(x); 
using var cts = new CancellationTokenSource();

for (int i = 0; i < 10; i++)
{
    // Do your work with a passed cancellationtoken you control
    var currentTask = DoWorkAsync(i, cts.Token);

    // Continuation will update the state of completed tasks
    currentTask = currentTask.ContinueWith((t) => 
    {
        if (t.IsCompletedSuccessfully)
        {
            var number = Interlocked.Increment(ref tasksCompleted);
            if (number == n)
            {
                // If we passed n tasks completed successfully,
                // We'll cancel after the grace period
                // Note that this will actually cancel the underlying tasks
                // Because we passed the token to the DoWorkAsync method
                cts.CancelAfter(timeAfterNCompleted);
            }
        }
    });
    tasks.Add(currentTask);
}

await Task.WhenAll(tasks);

// All your tasks have either completed or cancelled here
// Note that in this specific example all tasks will appear
// to have run to completion. That's because we're looking at
// the continuations here. Store continuation and actual task
// in separate lists and you can retrieve the results.
// (Make sure you await the continuations though)