异步等待某些任务完成 (Task.WhenSome)
Waiting asynchronously for some tasks to complete (Task.WhenSome)
我正在编写一项服务,该服务结合来自各种 Internet 来源的数据,并即时生成响应。速度比完整性更重要,所以我想在 一些 (不是全部)互联网资源响应后立即生成我的响应。通常我的服务创建 10 个并发 Web 请求,并且应该停止等待并在其中 5 个完成后开始处理。 .NET Framework 和我所知道的任何第三方库都没有提供此功能,因此我可能不得不自己编写它。我尝试实现的方法具有以下签名:
public static Task<TResult[]> WhenSome<TResult>(int atLeast, params Task<TResult>[] tasks)
{
// TODO
}
与 Task.WhenAny
的工作方式相反,如果已获取所需数量的结果,则应吞下异常。然而,如果在完成所有任务后,没有收集到足够的结果,那么应该抛出一个 AggregateException
来传播所有异常。
用法示例:
var tasks = new Task<int>[]
{
Task.Delay(100).ContinueWith<int>(_ => throw new ApplicationException("Oops!")),
Task.Delay(200).ContinueWith(_ => 10),
Task.Delay(Timeout.Infinite).ContinueWith(_ => 0,
new CancellationTokenSource(300).Token),
Task.Delay(400).ContinueWith(_ => 20),
Task.Delay(500).ContinueWith(_ => 30),
};
var results = await WhenSome(2, tasks);
Console.WriteLine($"Results: {String.Join(", ", results)}");
预期输出:
Results: 10, 20
在这个例子中,最后一个返回值 30
的任务应该被忽略(甚至不等待),因为我们已经获得了我们想要的结果数(2 个结果)。出于同样的原因,错误和取消的任务也应该被忽略。
这是一些笨拙的代码,我认为它可以满足您的要求。这可能是一个起点。
这也可能是一种糟糕的处理任务的方式and/or 不是线程安全的,and/or 只是一个糟糕的想法。但我希望有人会指出这一点。
async Task<TResult[]> WhenSome<TResult>(int atLeast, List<Task<TResult>> tasks)
{
List<Task<TResult>> completedTasks = new List<System.Threading.Tasks.Task<TResult>>();
int completed = 0;
List<Exception> exceptions = new List<Exception>();
while (completed < atLeast && tasks.Any()) {
var completedTask = await Task.WhenAny(tasks);
tasks.Remove(completedTask);
if (completedTask.IsCanceled)
{
continue;
}
if (completedTask.IsFaulted)
{
exceptions.Add(completedTask.Exception);
continue;
}
completed++;
completedTasks.Add(completedTask);
}
if (completed >= atLeast)
{
return completedTasks.Select(t => t.Result).ToArray();
}
throw new AggregateException(exceptions).Flatten();
}
我正在为这个问题添加一个解决方案,不是因为 in not sufficient, but just for the sake of variety. This implementation uses the 为了 return 一个包含所有异常的 Task
,其方式与构建的完全相同-in Task.WhenAll
方法传播所有异常。
public static Task<TResult[]> WhenSome<TResult>(int atLeast, params Task<TResult>[] tasks)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
if (atLeast < 1 || atLeast > tasks.Length)
throw new ArgumentOutOfRangeException(nameof(atLeast));
var cts = new CancellationTokenSource();
int successfulCount = 0;
var continuationAction = new Action<Task<TResult>>(task =>
{
if (task.IsCompletedSuccessfully)
if (Interlocked.Increment(ref successfulCount) == atLeast) cts.Cancel();
});
var continuations = tasks.Select(task => task.ContinueWith(continuationAction,
cts.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default));
return Task.WhenAll(continuations).ContinueWith(_ =>
{
cts.Dispose();
if (successfulCount >= atLeast) // Success
return Task.WhenAll(tasks.Where(task => task.IsCompletedSuccessfully));
else
return Task.WhenAll(tasks); // Failure
}, TaskScheduler.Default).Unwrap();
}
continuations
不会传播 tasks
的结果或异常。这些是可取消的延续,当达到指定的成功任务数时,它们将被整体取消。
注意: 此实现可能会传播超过 atLeast
个结果。如果你想要这个数量的结果,你可以在 .Where
LINQ operatgor 之后链接一个 .Take(atLeast)
。
我正在编写一项服务,该服务结合来自各种 Internet 来源的数据,并即时生成响应。速度比完整性更重要,所以我想在 一些 (不是全部)互联网资源响应后立即生成我的响应。通常我的服务创建 10 个并发 Web 请求,并且应该停止等待并在其中 5 个完成后开始处理。 .NET Framework 和我所知道的任何第三方库都没有提供此功能,因此我可能不得不自己编写它。我尝试实现的方法具有以下签名:
public static Task<TResult[]> WhenSome<TResult>(int atLeast, params Task<TResult>[] tasks)
{
// TODO
}
与 Task.WhenAny
的工作方式相反,如果已获取所需数量的结果,则应吞下异常。然而,如果在完成所有任务后,没有收集到足够的结果,那么应该抛出一个 AggregateException
来传播所有异常。
用法示例:
var tasks = new Task<int>[]
{
Task.Delay(100).ContinueWith<int>(_ => throw new ApplicationException("Oops!")),
Task.Delay(200).ContinueWith(_ => 10),
Task.Delay(Timeout.Infinite).ContinueWith(_ => 0,
new CancellationTokenSource(300).Token),
Task.Delay(400).ContinueWith(_ => 20),
Task.Delay(500).ContinueWith(_ => 30),
};
var results = await WhenSome(2, tasks);
Console.WriteLine($"Results: {String.Join(", ", results)}");
预期输出:
Results: 10, 20
在这个例子中,最后一个返回值 30
的任务应该被忽略(甚至不等待),因为我们已经获得了我们想要的结果数(2 个结果)。出于同样的原因,错误和取消的任务也应该被忽略。
这是一些笨拙的代码,我认为它可以满足您的要求。这可能是一个起点。
这也可能是一种糟糕的处理任务的方式and/or 不是线程安全的,and/or 只是一个糟糕的想法。但我希望有人会指出这一点。
async Task<TResult[]> WhenSome<TResult>(int atLeast, List<Task<TResult>> tasks)
{
List<Task<TResult>> completedTasks = new List<System.Threading.Tasks.Task<TResult>>();
int completed = 0;
List<Exception> exceptions = new List<Exception>();
while (completed < atLeast && tasks.Any()) {
var completedTask = await Task.WhenAny(tasks);
tasks.Remove(completedTask);
if (completedTask.IsCanceled)
{
continue;
}
if (completedTask.IsFaulted)
{
exceptions.Add(completedTask.Exception);
continue;
}
completed++;
completedTasks.Add(completedTask);
}
if (completed >= atLeast)
{
return completedTasks.Select(t => t.Result).ToArray();
}
throw new AggregateException(exceptions).Flatten();
}
我正在为这个问题添加一个解决方案,不是因为 Task
,其方式与构建的完全相同-in Task.WhenAll
方法传播所有异常。
public static Task<TResult[]> WhenSome<TResult>(int atLeast, params Task<TResult>[] tasks)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
if (atLeast < 1 || atLeast > tasks.Length)
throw new ArgumentOutOfRangeException(nameof(atLeast));
var cts = new CancellationTokenSource();
int successfulCount = 0;
var continuationAction = new Action<Task<TResult>>(task =>
{
if (task.IsCompletedSuccessfully)
if (Interlocked.Increment(ref successfulCount) == atLeast) cts.Cancel();
});
var continuations = tasks.Select(task => task.ContinueWith(continuationAction,
cts.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default));
return Task.WhenAll(continuations).ContinueWith(_ =>
{
cts.Dispose();
if (successfulCount >= atLeast) // Success
return Task.WhenAll(tasks.Where(task => task.IsCompletedSuccessfully));
else
return Task.WhenAll(tasks); // Failure
}, TaskScheduler.Default).Unwrap();
}
continuations
不会传播 tasks
的结果或异常。这些是可取消的延续,当达到指定的成功任务数时,它们将被整体取消。
注意: 此实现可能会传播超过 atLeast
个结果。如果你想要这个数量的结果,你可以在 .Where
LINQ operatgor 之后链接一个 .Take(atLeast)
。