我怎样才能等待一系列任务并停止等待第一个异常?
How can I await an array of tasks and stop waiting on first exception?
我有一系列任务,我正在等待 Task.WhenAll
。我的任务经常失败,在这种情况下,我会用一个消息框通知用户,以便她可以重试。我的问题是报告错误会延迟到所有任务完成。相反,我想在第一个任务抛出异常时立即通知用户。换句话说,我想要一个快速失败的 Task.WhenAll
版本。由于不存在这样的内置方法,我尝试自己制作,但我的实现并不像我想要的那样。这是我想出的:
public static async Task<TResult[]> WhenAllFailFast<TResult>(
params Task<TResult>[] tasks)
{
foreach (var task in tasks)
{
await task.ConfigureAwait(false);
}
return await Task.WhenAll(tasks).ConfigureAwait(false);
}
这通常比原生 Task.WhenAll
抛出速度更快,但通常不够快。在任务 #1 完成之前,不会观察到故障任务 #2。我怎样才能改进它,让它尽快失败?
更新: 关于取消,现在不在我的要求范围内,但可以说为了保持一致性,第一个取消的任务应该立即停止等待。在这种情况下,从 WhenAllFailFast
返回的组合任务应该具有 Status == TaskStatus.Canceled
.
澄清:取消场景是关于用户单击 取消 按钮停止任务完成。不是在异常情况下自动取消未完成的任务。
您最好的选择是使用 TaskCompletionSource
构建您的 WhenAllFailFast
方法。您可以 .ContinueWith() 每个具有同步延续的输入任务,当任务以故障状态结束时(使用相同的异常对象)使 TCS 出错。
可能类似于(未完全测试):
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Whosebug
{
class Program
{
static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
cts.Cancel();
var arr = await WhenAllFastFail(
Task.FromResult(42),
Task.Delay(2000).ContinueWith<int>(t => throw new Exception("ouch")),
Task.FromCanceled<int>(cts.Token));
Console.WriteLine("Hello World!");
}
public static Task<TResult[]> WhenAllFastFail<TResult>(params Task<TResult>[] tasks)
{
if (tasks is null || tasks.Length == 0) return Task.FromResult(Array.Empty<TResult>());
// defensive copy.
var defensive = tasks.Clone() as Task<TResult>[];
var tcs = new TaskCompletionSource<TResult[]>();
var remaining = defensive.Length;
Action<Task> check = t =>
{
switch (t.Status)
{
case TaskStatus.Faulted:
// we 'try' as some other task may beat us to the punch.
tcs.TrySetException(t.Exception.InnerException);
break;
case TaskStatus.Canceled:
// we 'try' as some other task may beat us to the punch.
tcs.TrySetCanceled();
break;
default:
// we can safely set here as no other task remains to run.
if (Interlocked.Decrement(ref remaining) == 0)
{
// get the results into an array.
var results = new TResult[defensive.Length];
for (var i = 0; i < tasks.Length; ++i) results[i] = defensive[i].Result;
tcs.SetResult(results);
}
break;
}
};
foreach (var task in defensive)
{
task.ContinueWith(check, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
return tcs.Task;
}
}
}
编辑:解包 AggregateException,取消支持,return 结果数组。防御数组突变、null 和空。显式 TaskScheduler。
您的循环以伪串行方式等待每个任务,因此它会等待任务 1 完成,然后再检查任务 2 是否失败。
您可能会发现这篇文章对第一次失败后中止的模式很有帮助:http://gigi.nullneuron.net/gigilabs/patterns-for-asynchronous-composite-tasks-in-c/
public static async Task<TResult[]> WhenAllFailFast<TResult>(
params Task<TResult>[] tasks)
{
var taskList = tasks.ToList();
while (taskList.Count > 0)
{
var task = await Task.WhenAny(taskList).ConfigureAwait(false);
if(task.Exception != null)
{
// Left as an exercise for the reader:
// properly unwrap the AggregateException;
// handle the exception(s);
// cancel the other running tasks.
throw task.Exception.InnerException;
}
taskList.Remove(task);
}
return await Task.WhenAll(tasks).ConfigureAwait(false);
}
我最近再次需要 WhenAllFailFast
方法,我修改了@ZaldronGG 的 以使其性能更高(并且更符合 Stephen Cleary 的建议)。下面的实现在我的 PC 中每秒处理大约 3,500,000 个任务。
public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
if (tasks is null) throw new ArgumentNullException(nameof(tasks));
if (tasks.Length == 0) return Task.FromResult(new TResult[0]);
var results = new TResult[tasks.Length];
var remaining = tasks.Length;
var tcs = new TaskCompletionSource<TResult[]>(
TaskCreationOptions.RunContinuationsAsynchronously);
for (int i = 0; i < tasks.Length; i++)
{
var task = tasks[i];
if (task == null) throw new ArgumentException(
$"The {nameof(tasks)} argument included a null value.", nameof(tasks));
HandleCompletion(task, i);
}
return tcs.Task;
async void HandleCompletion(Task<TResult> task, int index)
{
try
{
var result = await task.ConfigureAwait(false);
results[index] = result;
if (Interlocked.Decrement(ref remaining) == 0)
{
tcs.TrySetResult(results);
}
}
catch (OperationCanceledException)
{
tcs.TrySetCanceled();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
}
我要为这个问题再添加一个答案,不是因为我找到了更快的解决方案,而是因为我现在对在未知 [=12= 上启动多个 async void
操作有点怀疑].我在这里提出的解决方案要慢得多。它比@ZaldronGG 的 , and about 10 times slower than my previous 实现慢了大约 3 倍。它的优点是,在返回的 Task<TResult[]>
完成后,它不会泄漏附加在观察到的任务上的即发即弃的延续。当这个任务完成时,所有由WhenAllFailFast
方法内部创建的continuations 都被清除了。 API 的理想行为是普遍的,但在许多情况下它可能并不重要。
public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
var cts = new CancellationTokenSource();
Task<TResult> failedTask = null;
var continuationAction = new Action<Task<TResult>>(task =>
{
if (!task.IsCompletedSuccessfully)
if (Interlocked.CompareExchange(ref failedTask, task, null) == null)
cts.Cancel();
});
var continuations = tasks.Select(task => task.ContinueWith(continuationAction,
cts.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default));
return Task.WhenAll(continuations).ContinueWith(_ =>
{
cts.Dispose();
if (failedTask != null) return Task.WhenAll(failedTask);
// At this point all the tasks are completed successfully
return Task.WhenAll(tasks);
}, TaskScheduler.Default).Unwrap();
}
此实现与 ZaldronGG 的实现类似,它在每个任务上附加一个延续,不同之处在于这些延续是可取消的,并且当观察到第一个不成功的任务时,它们会被整体取消。它还使用了我最近发现的 ,它消除了手动完成 TaskCompletionSource<TResult[]>
实例的需要,通常实现简洁。
我有一系列任务,我正在等待 Task.WhenAll
。我的任务经常失败,在这种情况下,我会用一个消息框通知用户,以便她可以重试。我的问题是报告错误会延迟到所有任务完成。相反,我想在第一个任务抛出异常时立即通知用户。换句话说,我想要一个快速失败的 Task.WhenAll
版本。由于不存在这样的内置方法,我尝试自己制作,但我的实现并不像我想要的那样。这是我想出的:
public static async Task<TResult[]> WhenAllFailFast<TResult>(
params Task<TResult>[] tasks)
{
foreach (var task in tasks)
{
await task.ConfigureAwait(false);
}
return await Task.WhenAll(tasks).ConfigureAwait(false);
}
这通常比原生 Task.WhenAll
抛出速度更快,但通常不够快。在任务 #1 完成之前,不会观察到故障任务 #2。我怎样才能改进它,让它尽快失败?
更新: 关于取消,现在不在我的要求范围内,但可以说为了保持一致性,第一个取消的任务应该立即停止等待。在这种情况下,从 WhenAllFailFast
返回的组合任务应该具有 Status == TaskStatus.Canceled
.
澄清:取消场景是关于用户单击 取消 按钮停止任务完成。不是在异常情况下自动取消未完成的任务。
您最好的选择是使用 TaskCompletionSource
构建您的 WhenAllFailFast
方法。您可以 .ContinueWith() 每个具有同步延续的输入任务,当任务以故障状态结束时(使用相同的异常对象)使 TCS 出错。
可能类似于(未完全测试):
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Whosebug
{
class Program
{
static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
cts.Cancel();
var arr = await WhenAllFastFail(
Task.FromResult(42),
Task.Delay(2000).ContinueWith<int>(t => throw new Exception("ouch")),
Task.FromCanceled<int>(cts.Token));
Console.WriteLine("Hello World!");
}
public static Task<TResult[]> WhenAllFastFail<TResult>(params Task<TResult>[] tasks)
{
if (tasks is null || tasks.Length == 0) return Task.FromResult(Array.Empty<TResult>());
// defensive copy.
var defensive = tasks.Clone() as Task<TResult>[];
var tcs = new TaskCompletionSource<TResult[]>();
var remaining = defensive.Length;
Action<Task> check = t =>
{
switch (t.Status)
{
case TaskStatus.Faulted:
// we 'try' as some other task may beat us to the punch.
tcs.TrySetException(t.Exception.InnerException);
break;
case TaskStatus.Canceled:
// we 'try' as some other task may beat us to the punch.
tcs.TrySetCanceled();
break;
default:
// we can safely set here as no other task remains to run.
if (Interlocked.Decrement(ref remaining) == 0)
{
// get the results into an array.
var results = new TResult[defensive.Length];
for (var i = 0; i < tasks.Length; ++i) results[i] = defensive[i].Result;
tcs.SetResult(results);
}
break;
}
};
foreach (var task in defensive)
{
task.ContinueWith(check, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
return tcs.Task;
}
}
}
编辑:解包 AggregateException,取消支持,return 结果数组。防御数组突变、null 和空。显式 TaskScheduler。
您的循环以伪串行方式等待每个任务,因此它会等待任务 1 完成,然后再检查任务 2 是否失败。
您可能会发现这篇文章对第一次失败后中止的模式很有帮助:http://gigi.nullneuron.net/gigilabs/patterns-for-asynchronous-composite-tasks-in-c/
public static async Task<TResult[]> WhenAllFailFast<TResult>(
params Task<TResult>[] tasks)
{
var taskList = tasks.ToList();
while (taskList.Count > 0)
{
var task = await Task.WhenAny(taskList).ConfigureAwait(false);
if(task.Exception != null)
{
// Left as an exercise for the reader:
// properly unwrap the AggregateException;
// handle the exception(s);
// cancel the other running tasks.
throw task.Exception.InnerException;
}
taskList.Remove(task);
}
return await Task.WhenAll(tasks).ConfigureAwait(false);
}
我最近再次需要 WhenAllFailFast
方法,我修改了@ZaldronGG 的
public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
if (tasks is null) throw new ArgumentNullException(nameof(tasks));
if (tasks.Length == 0) return Task.FromResult(new TResult[0]);
var results = new TResult[tasks.Length];
var remaining = tasks.Length;
var tcs = new TaskCompletionSource<TResult[]>(
TaskCreationOptions.RunContinuationsAsynchronously);
for (int i = 0; i < tasks.Length; i++)
{
var task = tasks[i];
if (task == null) throw new ArgumentException(
$"The {nameof(tasks)} argument included a null value.", nameof(tasks));
HandleCompletion(task, i);
}
return tcs.Task;
async void HandleCompletion(Task<TResult> task, int index)
{
try
{
var result = await task.ConfigureAwait(false);
results[index] = result;
if (Interlocked.Decrement(ref remaining) == 0)
{
tcs.TrySetResult(results);
}
}
catch (OperationCanceledException)
{
tcs.TrySetCanceled();
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}
}
我要为这个问题再添加一个答案,不是因为我找到了更快的解决方案,而是因为我现在对在未知 [=12= 上启动多个 async void
操作有点怀疑].我在这里提出的解决方案要慢得多。它比@ZaldronGG 的 Task<TResult[]>
完成后,它不会泄漏附加在观察到的任务上的即发即弃的延续。当这个任务完成时,所有由WhenAllFailFast
方法内部创建的continuations 都被清除了。 API 的理想行为是普遍的,但在许多情况下它可能并不重要。
public static Task<TResult[]> WhenAllFailFast<TResult>(params Task<TResult>[] tasks)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
var cts = new CancellationTokenSource();
Task<TResult> failedTask = null;
var continuationAction = new Action<Task<TResult>>(task =>
{
if (!task.IsCompletedSuccessfully)
if (Interlocked.CompareExchange(ref failedTask, task, null) == null)
cts.Cancel();
});
var continuations = tasks.Select(task => task.ContinueWith(continuationAction,
cts.Token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default));
return Task.WhenAll(continuations).ContinueWith(_ =>
{
cts.Dispose();
if (failedTask != null) return Task.WhenAll(failedTask);
// At this point all the tasks are completed successfully
return Task.WhenAll(tasks);
}, TaskScheduler.Default).Unwrap();
}
此实现与 ZaldronGG 的实现类似,它在每个任务上附加一个延续,不同之处在于这些延续是可取消的,并且当观察到第一个不成功的任务时,它们会被整体取消。它还使用了我最近发现的 TaskCompletionSource<TResult[]>
实例的需要,通常实现简洁。