并行执行任务

Executing tasks in parallel

好的,基本上我有一堆任务 (10),我想同时启动它们并等待它们完成。完成后我想执行其他任务。我阅读了很多关于此的资源,但我无法针对我的特定情况做出正确的选择...

这是我目前拥有的(代码已简化):

public async Task RunTasks()
{
    var tasks = new List<Task>
    {
        new Task(async () => await DoWork()),
        //and so on with the other 9 similar tasks
    }


    Parallel.ForEach(tasks, task =>
    {
        task.Start();
    });

    Task.WhenAll(tasks).ContinueWith(done =>
    {
        //Run the other tasks
    });
}

//This function perform some I/O operations
public async Task DoWork()
{
    var results = await GetDataFromDatabaseAsync();
    foreach (var result in results)
    {
        await ReadFromNetwork(result.Url);
    }
}

所以我的问题是,当我通过 WhenAll 调用等待任务完成时,它告诉我所有任务都已结束,即使其中 none 已完成。我尝试在我的 foreach 中添加 Console.WriteLine,当我进入继续任务时,数据不断从我之前的 Task 中传入,但这些数据还没有真正完成。

我做错了什么?

您几乎不应该直接使用 Task 构造函数。在您的情况下,该任务只会触发您迫不及待的实际任务。

您可以简单地调用 DoWork 并取回任务,将其存储在列表中并等待所有任务完成。含义:

tasks.Add(DoWork());
// ...
await Task.WhenAll(tasks);

但是,异步方法 运行 同步直到到达未完成任务的第一个等待。如果您担心该部分花费的时间太长,请使用 Task.Run 将其卸载到另一个 ThreadPool 线程,然后将 that 任务存储在列表中:

tasks.Add(Task.Run(() => DoWork()));
// ...
await Task.WhenAll(tasks);

DoWork 方法是一个异步I/O 方法。这意味着您不需要多个线程来执行其中的几个,因为大多数时候该方法将异步等待 I/O 完成。一个线程就足够了。

public async Task RunTasks()
{
    var tasks = new List<Task>
    {
        DoWork(),
        //and so on with the other 9 similar tasks
    };

    await Task.WhenAll(tasks);

    //Run the other tasks            
}

您几乎应该 never use the Task constructor to create a new task. To create an asynchronous I/O task, simply call the async method. To create a task that will be executed on a thread pool thread, use Task.Run. You can read this article 详细解释 Task.Run 和创建任务的其他选项。

如果您想 运行 使用 TPL 在不同线程中并行执行这些任务,您可能需要这样的东西:

public async Task RunTasks()
{
    var tasks = new List<Func<Task>>
    {
       DoWork,
       //...
    };

    await Task.WhenAll(tasks.AsParallel().Select(async task => await task()));

    //Run the other tasks
}

这些方法仅并行化少量代码:将方法排队到线程池和 return 未完成的 Task。同样,对于如此少量的任务,并行化可能比 运行 异步花费更多的时间。仅当您的任务在第一次等待之前执行更长的(同步)工作时,这才有意义。

对于大多数情况,更好的方法是:

public async Task RunTasks()
{
    await Task.WhenAll(new [] 
    {
        DoWork(),
        //...
    });
    //Run the other tasks
}

我对你的代码的看法:

  1. 在传递给 Parallel.ForEach 之前,您不应该将代码包装在 Task 中。

  2. 你可以 await Task.WhenAll 而不是 ContinueWith.

从本质上讲,您是在混合使用两种不兼容的异步范例;即 Parallel.ForEach()async-await.

为所欲为,做一个或另一个。例如。您可以只使用 Parallel.For[Each]() 并完全放弃异步等待。 Parallel.For[Each]() 只会 return 当所有并行任务都完成后,您就可以继续执行其他任务了。

该代码还有一些其他问题:

  • 您将方法标记为异步但不要在其中等待(您拥有的等待是在委托中,而不是在方法中);

  • 你几乎肯定想要 .ConfigureAwait(false) 等待,特别是如果你不想在 UI 线程中立即使用结果。

只需在 Task.WhenAll

周围添加一个 try-catch 块

注意:抛出一个 System.AggregateException 的实例,它作为一个或多个已发生异常的包装器。这对于协调 Task.WaitAll() 和 Task.WaitAny() 等多个任务的方法很重要,因此 AggregateException 能够将所有异常包装在已发生的 运行 任务中。

try
{ 
    Task.WaitAll(tasks.ToArray());  
}
catch(AggregateException ex)
{ 
    foreach (Exception inner in ex.InnerExceptions)
    {
    Console.WriteLine(String.Format("Exception type {0} from {1}", inner.GetType(), inner.Source));
    }
}