WaitAll 等待更改列表<Task>

WaitAll for Changing List<Task>

已更新以更清楚地解释事情

我有一个应用程序 运行 有很多任务。有些是最初创建的,有些可以稍后添加。我需要一个编程结构来等待所有任务完成。一旦所有任务完成,一些其他代码应该 运行 清理事情并对其他任务生成的数据进行一些最终处理。

我想出了一个方法来做到这一点,但不会称之为优雅。所以我想看看有没有更好的方法。

我所做的是将任务列表保存在 ConcurrentBag(线程安全集合)中。在流程开始时,我创建了一些任务并将其添加到 ConcurrentBag。如果创建了一个也需要在最后步骤之前完成的新任务,那么当流程执行它的任务时,我还将它添加到 ConcurrentBag。

Task.Wait 接受一组任务作为其参数。我可以将 ConcurrentBag 转换成数组,但该数组将不包含在调用 Task.Wait 后添加到包中的任何任务。

所以我在 do while 循环中有一个两步等待过程。在循环体中,我对从 Bag 生成的数组执行简单的 Task.Wait。当它完成时,意味着所有原始任务都已完成。然后在 while 测试中,我对从 ConcurrentBag 生成的新数组进行了 1 毫秒的快速测试。如果没有添加新任务,或者任何新任务也已完成,它将 return 为真,因此非条件退出循环。

如果它 return 为假(因为添加了一个未完成的新任务),我们返回并执行非定时 Task.Wait。然后冲洗并重复,直到完成所有新旧任务。

// defined on the class, perhaps they should be properties
CancellationTokenSource Source = new CancellationTokenSource();
CancellationToken Token = Source.Token;
ConcurrentBag<Task> ToDoList = new ConcurrentBag<Task>();


public void RunAndWait() {
    // start some tasks add them to the list
    for (int i = 0; i < 12; i++)
    {
        Task task = new Task(() => SillyExample(Token), Token);
        ToDoList.Add(task); 
        task.Start();
    }

    // now wait for those task, and any other tasks added to ToDoList to complete
    try
    {
        do 
        {
            Task.WaitAll(ToDoList.ToArray(), Token);
        } while (! Task.WaitAll(ToDoList.ToArray(), 1, Token));
    }
    catch (OperationCanceledException e)
    {
        // any special handling of cancel we might want to do
    }

    // code that should only run after all tasks complete
}

有没有更优雅的方法来做到这一点?

我建议使用 ConcurrentQueue 并在等待时移除项目。由于队列的先进先出性质,如果你到达队列中没有剩余任何任务的地步,你就知道你已经等待了所有已添加到该点的任务。

ConcurrentQueue<Task> ToDoQueue = new ConcurrentQueue<Task>();

...

    while(ToDoQueue.Count > 0 && !Token.IsCancellationRequested) 
    {
        Task task;
        if(ToDoQueue.TryDequeue(out task))
        {
            task.Wait(Token);
        }
    }        

这是使用 Microsoft 的 Reactive Framework (NuGet "Rx-Main") 的一种非常酷的方法。

var taskSubject = new Subject<Task>();

var query = taskSubject.Select(t => Observable.FromAsync(() => t)).Merge();

var subscription = 
    query.Subscribe(
        u => { /* Each Task Completed */ },
        () => Console.WriteLine("All Tasks Completed."));

现在,要添加任务,只需执行以下操作:

taskSubject.OnNext(Task.Run(() => { }));
taskSubject.OnNext(Task.Run(() => { }));
taskSubject.OnNext(Task.Run(() => { }));

然后发出完成信号:

taskSubject.OnCompleted();

重要的是要注意信号完成不会立即完成查询,它也会等待所有任务完成。信号完成只是表示您将不再添加任何新任务。

最后,如果你想取消,那么就这样做:

subscription.Dispose();