异步等待某些任务完成 (Task.WhenSome)

Waiting asynchronously for some tasks to complete (Task.WhenSome)

我正在编写一项服务,该服务结合来自各种 Internet 来源的数据,并即时生成响应。速度比完整性更重要,所以我想在 一些 (不是全部)互联网资源响应后立即生成我的响应。通常我的服务创建 10 个并发 Web 请求,并且应该停止等待并在其中 5 个完成后开始处理。 .NET Framework 和我所知道的任何第三方库都没有提供此功能,因此我可能不得不自己编写它。我尝试实现的方法具有以下签名:

public static Task<TResult[]> WhenSome<TResult>(int atLeast, params Task<TResult>[] tasks)
{
    // TODO
}

Task.WhenAny 的工作方式相反,如果已获取所需数量的结果,则应吞下异常。然而,如果在完成所有任务后,没有收集到足够的结果,那么应该抛出一个 AggregateException 来传播所有异常。

用法示例:

var tasks = new Task<int>[]
{
    Task.Delay(100).ContinueWith<int>(_ => throw new ApplicationException("Oops!")),
    Task.Delay(200).ContinueWith(_ => 10),
    Task.Delay(Timeout.Infinite).ContinueWith(_ => 0,
        new CancellationTokenSource(300).Token),
    Task.Delay(400).ContinueWith(_ => 20),
    Task.Delay(500).ContinueWith(_ => 30),
};
var results = await WhenSome(2, tasks);
Console.WriteLine($"Results: {String.Join(", ", results)}");

预期输出:

Results: 10, 20

在这个例子中,最后一个返回值 30 的任务应该被忽略(甚至不等待),因为我们已经获得了我们想要的结果数(2 个结果)。出于同样的原因,错误和取消的任务也应该被忽略。

这是一些笨拙的代码,我认为它可以满足您的要求。这可能是一个起点。

这也可能是一种糟糕的处理任务的方式and/or 不是线程安全的,and/or 只是一个糟糕的想法。但我希望有人会指出这一点。

async Task<TResult[]> WhenSome<TResult>(int atLeast, List<Task<TResult>> tasks)
{
    List<Task<TResult>> completedTasks = new List<System.Threading.Tasks.Task<TResult>>();
    int completed = 0;
    List<Exception> exceptions = new List<Exception>();

    while (completed < atLeast && tasks.Any()) {
        var completedTask = await Task.WhenAny(tasks);
        tasks.Remove(completedTask);

        if (completedTask.IsCanceled)
        {
            continue;
        }

        if (completedTask.IsFaulted)
        {
            exceptions.Add(completedTask.Exception);
            continue;
        }

        completed++;
        completedTasks.Add(completedTask);
    }

    if (completed >= atLeast)
    {
        return completedTasks.Select(t => t.Result).ToArray();
    }

    throw new AggregateException(exceptions).Flatten();
}

我正在为这个问题添加一个解决方案,不是因为 in not sufficient, but just for the sake of variety. This implementation uses the 为了 return 一个包含所有异常的 Task,其方式与构建的完全相同-in Task.WhenAll 方法传播所有异常。

public static Task<TResult[]> WhenSome<TResult>(int atLeast, params Task<TResult>[] tasks)
{
    if (tasks == null) throw new ArgumentNullException(nameof(tasks));
    if (atLeast < 1 || atLeast > tasks.Length)
        throw new ArgumentOutOfRangeException(nameof(atLeast));

    var cts = new CancellationTokenSource();
    int successfulCount = 0;
    var continuationAction = new Action<Task<TResult>>(task =>
    {
        if (task.IsCompletedSuccessfully)
            if (Interlocked.Increment(ref successfulCount) == atLeast) cts.Cancel();
    });
    var continuations = tasks.Select(task => task.ContinueWith(continuationAction,
        cts.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default));

    return Task.WhenAll(continuations).ContinueWith(_ =>
    {
        cts.Dispose();
        if (successfulCount >= atLeast) // Success
            return Task.WhenAll(tasks.Where(task => task.IsCompletedSuccessfully));
        else
            return Task.WhenAll(tasks); // Failure
    }, TaskScheduler.Default).Unwrap();
}

continuations 不会传播 tasks 的结果或异常。这些是可取消的延续,当达到指定的成功任务数时,它们将被整体取消。

注意: 此实现可能会传播超过 atLeast 个结果。如果你想要这个数量的结果,你可以在 .Where LINQ operatgor 之后链接一个 .Take(atLeast)