对多个任务使用 ContinueWith

Using ContinueWith with Multiple Tasks

这与我想象的不太一样,需要很简单,启动一些任务来对一个对象进行操作。每个任务一个唯一的对象。第二部分是每个任务报告结果时的ContinueWith。但是,我没有得到 WhenAll 类型的行为。希望有人能帮我弄清楚。

_tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
    _tasks[loopCnt] = Task<AnalysisResultArgs>.Factory.StartNew(() =>
    {
        return _beansList[loopCnt].Analyze(newBeanData);
    });
    await _tasks[loopCnt].ContinueWith(ReportResults, 
                  TaskContinuationOptions.RunContinuationsAsynchronously)  
    // do some housekeeping when all tasks are complete          
}

private void ReportResults(Task<AnalysisResultArgs> task)
{
     /* Do some serial operations
}

据我了解,_beansList.Count 任务将启动,并且通过在 ContinueWith 上使用 await,在所有任务完成之前,内务处理工作不会执行。我无法阻止,因为我需要确保能够限制传入数据以防止等待执行的任务过多。

我哪里搞砸了,等待实际上完成了,内务处理得到 运行,即使不是所有任务都 运行 完成。

您不是在等待所有任务,而是在循环中等待继续。您应该为此使用 Task.WhenAll 方法。另外,如果可以 运行 它在任务中,为什么还需要继续?像这样简化您的代码:

private void ReportResults(AnalysisResultArgs results)
{
     /* Do some serial operations */
}

...
_tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
    var count = loopCnt;
    _tasks[count] = Task.Run(() =>
    {
        var results = _beansList[count].Analyze(newBeanData);
        ReportResults(results);
        return results;
    });
}

// do some housekeeping when all tasks are complete          
await Task.WhenAll(_tasks);

正如@Stephen 已经提到的,问题中的代码不是最小的、完整的和可验证的。我冒昧地做了一些假设,在我看来你的代码应该是这样的:

public async Task<AnalysisResultArgs[]> MainMethod()
{
    var _beansList = new List<AnalysisResultArgs>();

    for(int i=0; i< 99; i++) // Considering 100 records
        _beansList.Add(new AnalysisResultArgs());

    var _tasks = new Task<AnalysisResultArgs>[_beansList.Count];

    for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
    {
        var local = loopCnt;
        _tasks[local] = Task.Run(async() => await ReportResults(_beansList[local].Analyze(new AnalysisResultArgs())));
    }

    return await Task.WhenAll(_tasks);
}

private async Task<AnalysisResultArgs> ReportResults(Task<AnalysisResultArgs> task)
{
    await Task.Delay(1000);
    return await Task.FromResult(new AnalysisResultArgs());
}

public class AnalysisResultArgs
{   
    public async Task<AnalysisResultArgs> Analyze(AnalysisResultArgs newBeanData)
    {
        await Task.Delay(1000);
        return await Task.FromResult(new AnalysisResultArgs());
    }
}

假设/其他细节:

  1. Task.DelayTask.FromResult 只是实际逻辑的占位符。
  2. 调用 AnalyzeReportResults 中的所有方法都是异步的
  3. 循环代码可能是:

    _tasks[local] = ReportResults(_beansList[local].Analyze(new 
             AnalysisResultArgs()));
    

但如果我们使用 Task.Run 启动 Task,那么 async, await 有助于释放调用线程池线程/同步上下文。它有助于提高系统的可扩展性。

  1. 假设_beansListList<AnalysisResultArgs>类型,但可以根据实际需要修改
  2. ReportResults 可以修改为采用 Func<Task<AnalysisResultArgs>> func,而不是简单的 dTask<AnalysisResultArgs>,因此在方法内部执行 await func(),然后调用代码可能是真正异步的,如下所示:

      _tasks[local] = Task.Run(async() => await ReportResults(async() => await 
      _beansList[local].Analyze(new AnalysisResultArgs())));