WaitAll 等待更改列表<Task>
WaitAll for Changing List<Task>
已更新以更清楚地解释事情
我有一个应用程序 运行 有很多任务。有些是最初创建的,有些可以稍后添加。我需要一个编程结构来等待所有任务完成。一旦所有任务完成,一些其他代码应该 运行 清理事情并对其他任务生成的数据进行一些最终处理。
我想出了一个方法来做到这一点,但不会称之为优雅。所以我想看看有没有更好的方法。
我所做的是将任务列表保存在 ConcurrentBag(线程安全集合)中。在流程开始时,我创建了一些任务并将其添加到 ConcurrentBag。如果创建了一个也需要在最后步骤之前完成的新任务,那么当流程执行它的任务时,我还将它添加到 ConcurrentBag。
Task.Wait 接受一组任务作为其参数。我可以将 ConcurrentBag 转换成数组,但该数组将不包含在调用 Task.Wait 后添加到包中的任何任务。
所以我在 do while 循环中有一个两步等待过程。在循环体中,我对从 Bag 生成的数组执行简单的 Task.Wait。当它完成时,意味着所有原始任务都已完成。然后在 while 测试中,我对从 ConcurrentBag 生成的新数组进行了 1 毫秒的快速测试。如果没有添加新任务,或者任何新任务也已完成,它将 return 为真,因此非条件退出循环。
如果它 return 为假(因为添加了一个未完成的新任务),我们返回并执行非定时 Task.Wait。然后冲洗并重复,直到完成所有新旧任务。
// defined on the class, perhaps they should be properties
CancellationTokenSource Source = new CancellationTokenSource();
CancellationToken Token = Source.Token;
ConcurrentBag<Task> ToDoList = new ConcurrentBag<Task>();
public void RunAndWait() {
// start some tasks add them to the list
for (int i = 0; i < 12; i++)
{
Task task = new Task(() => SillyExample(Token), Token);
ToDoList.Add(task);
task.Start();
}
// now wait for those task, and any other tasks added to ToDoList to complete
try
{
do
{
Task.WaitAll(ToDoList.ToArray(), Token);
} while (! Task.WaitAll(ToDoList.ToArray(), 1, Token));
}
catch (OperationCanceledException e)
{
// any special handling of cancel we might want to do
}
// code that should only run after all tasks complete
}
有没有更优雅的方法来做到这一点?
我建议使用 ConcurrentQueue
并在等待时移除项目。由于队列的先进先出性质,如果你到达队列中没有剩余任何任务的地步,你就知道你已经等待了所有已添加到该点的任务。
ConcurrentQueue<Task> ToDoQueue = new ConcurrentQueue<Task>();
...
while(ToDoQueue.Count > 0 && !Token.IsCancellationRequested)
{
Task task;
if(ToDoQueue.TryDequeue(out task))
{
task.Wait(Token);
}
}
这是使用 Microsoft 的 Reactive Framework (NuGet "Rx-Main") 的一种非常酷的方法。
var taskSubject = new Subject<Task>();
var query = taskSubject.Select(t => Observable.FromAsync(() => t)).Merge();
var subscription =
query.Subscribe(
u => { /* Each Task Completed */ },
() => Console.WriteLine("All Tasks Completed."));
现在,要添加任务,只需执行以下操作:
taskSubject.OnNext(Task.Run(() => { }));
taskSubject.OnNext(Task.Run(() => { }));
taskSubject.OnNext(Task.Run(() => { }));
然后发出完成信号:
taskSubject.OnCompleted();
重要的是要注意信号完成不会立即完成查询,它也会等待所有任务完成。信号完成只是表示您将不再添加任何新任务。
最后,如果你想取消,那么就这样做:
subscription.Dispose();
已更新以更清楚地解释事情
我有一个应用程序 运行 有很多任务。有些是最初创建的,有些可以稍后添加。我需要一个编程结构来等待所有任务完成。一旦所有任务完成,一些其他代码应该 运行 清理事情并对其他任务生成的数据进行一些最终处理。
我想出了一个方法来做到这一点,但不会称之为优雅。所以我想看看有没有更好的方法。
我所做的是将任务列表保存在 ConcurrentBag(线程安全集合)中。在流程开始时,我创建了一些任务并将其添加到 ConcurrentBag。如果创建了一个也需要在最后步骤之前完成的新任务,那么当流程执行它的任务时,我还将它添加到 ConcurrentBag。
Task.Wait 接受一组任务作为其参数。我可以将 ConcurrentBag 转换成数组,但该数组将不包含在调用 Task.Wait 后添加到包中的任何任务。
所以我在 do while 循环中有一个两步等待过程。在循环体中,我对从 Bag 生成的数组执行简单的 Task.Wait。当它完成时,意味着所有原始任务都已完成。然后在 while 测试中,我对从 ConcurrentBag 生成的新数组进行了 1 毫秒的快速测试。如果没有添加新任务,或者任何新任务也已完成,它将 return 为真,因此非条件退出循环。
如果它 return 为假(因为添加了一个未完成的新任务),我们返回并执行非定时 Task.Wait。然后冲洗并重复,直到完成所有新旧任务。
// defined on the class, perhaps they should be properties
CancellationTokenSource Source = new CancellationTokenSource();
CancellationToken Token = Source.Token;
ConcurrentBag<Task> ToDoList = new ConcurrentBag<Task>();
public void RunAndWait() {
// start some tasks add them to the list
for (int i = 0; i < 12; i++)
{
Task task = new Task(() => SillyExample(Token), Token);
ToDoList.Add(task);
task.Start();
}
// now wait for those task, and any other tasks added to ToDoList to complete
try
{
do
{
Task.WaitAll(ToDoList.ToArray(), Token);
} while (! Task.WaitAll(ToDoList.ToArray(), 1, Token));
}
catch (OperationCanceledException e)
{
// any special handling of cancel we might want to do
}
// code that should only run after all tasks complete
}
有没有更优雅的方法来做到这一点?
我建议使用 ConcurrentQueue
并在等待时移除项目。由于队列的先进先出性质,如果你到达队列中没有剩余任何任务的地步,你就知道你已经等待了所有已添加到该点的任务。
ConcurrentQueue<Task> ToDoQueue = new ConcurrentQueue<Task>();
...
while(ToDoQueue.Count > 0 && !Token.IsCancellationRequested)
{
Task task;
if(ToDoQueue.TryDequeue(out task))
{
task.Wait(Token);
}
}
这是使用 Microsoft 的 Reactive Framework (NuGet "Rx-Main") 的一种非常酷的方法。
var taskSubject = new Subject<Task>();
var query = taskSubject.Select(t => Observable.FromAsync(() => t)).Merge();
var subscription =
query.Subscribe(
u => { /* Each Task Completed */ },
() => Console.WriteLine("All Tasks Completed."));
现在,要添加任务,只需执行以下操作:
taskSubject.OnNext(Task.Run(() => { }));
taskSubject.OnNext(Task.Run(() => { }));
taskSubject.OnNext(Task.Run(() => { }));
然后发出完成信号:
taskSubject.OnCompleted();
重要的是要注意信号完成不会立即完成查询,它也会等待所有任务完成。信号完成只是表示您将不再添加任何新任务。
最后,如果你想取消,那么就这样做:
subscription.Dispose();