对多个任务使用 ContinueWith
Using ContinueWith with Multiple Tasks
这与我想象的不太一样,需要很简单,启动一些任务来对一个对象进行操作。每个任务一个唯一的对象。第二部分是每个任务报告结果时的ContinueWith
。但是,我没有得到 WhenAll
类型的行为。希望有人能帮我弄清楚。
_tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
_tasks[loopCnt] = Task<AnalysisResultArgs>.Factory.StartNew(() =>
{
return _beansList[loopCnt].Analyze(newBeanData);
});
await _tasks[loopCnt].ContinueWith(ReportResults,
TaskContinuationOptions.RunContinuationsAsynchronously)
// do some housekeeping when all tasks are complete
}
private void ReportResults(Task<AnalysisResultArgs> task)
{
/* Do some serial operations
}
据我了解,_beansList.Count
任务将启动,并且通过在 ContinueWith
上使用 await
,在所有任务完成之前,内务处理工作不会执行。我无法阻止,因为我需要确保能够限制传入数据以防止等待执行的任务过多。
我哪里搞砸了,等待实际上完成了,内务处理得到 运行,即使不是所有任务都 运行 完成。
您不是在等待所有任务,而是在循环中等待继续。您应该为此使用 Task.WhenAll
方法。另外,如果可以 运行 它在任务中,为什么还需要继续?像这样简化您的代码:
private void ReportResults(AnalysisResultArgs results)
{
/* Do some serial operations */
}
...
_tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
var count = loopCnt;
_tasks[count] = Task.Run(() =>
{
var results = _beansList[count].Analyze(newBeanData);
ReportResults(results);
return results;
});
}
// do some housekeeping when all tasks are complete
await Task.WhenAll(_tasks);
正如@Stephen 已经提到的,问题中的代码不是最小的、完整的和可验证的。我冒昧地做了一些假设,在我看来你的代码应该是这样的:
public async Task<AnalysisResultArgs[]> MainMethod()
{
var _beansList = new List<AnalysisResultArgs>();
for(int i=0; i< 99; i++) // Considering 100 records
_beansList.Add(new AnalysisResultArgs());
var _tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
var local = loopCnt;
_tasks[local] = Task.Run(async() => await ReportResults(_beansList[local].Analyze(new AnalysisResultArgs())));
}
return await Task.WhenAll(_tasks);
}
private async Task<AnalysisResultArgs> ReportResults(Task<AnalysisResultArgs> task)
{
await Task.Delay(1000);
return await Task.FromResult(new AnalysisResultArgs());
}
public class AnalysisResultArgs
{
public async Task<AnalysisResultArgs> Analyze(AnalysisResultArgs newBeanData)
{
await Task.Delay(1000);
return await Task.FromResult(new AnalysisResultArgs());
}
}
假设/其他细节:
Task.Delay
和 Task.FromResult
只是实际逻辑的占位符。
- 调用
Analyze
和 ReportResults
中的所有方法都是异步的
循环代码可能是:
_tasks[local] = ReportResults(_beansList[local].Analyze(new
AnalysisResultArgs()));
但如果我们使用 Task.Run
启动 Task
,那么 async, await
有助于释放调用线程池线程/同步上下文。它有助于提高系统的可扩展性。
- 假设
_beansList
为List<AnalysisResultArgs>
类型,但可以根据实际需要修改
ReportResults
可以修改为采用 Func<Task<AnalysisResultArgs>> func
,而不是简单的 dTask<AnalysisResultArgs>
,因此在方法内部执行 await func()
,然后调用代码可能是真正异步的,如下所示:
_tasks[local] = Task.Run(async() => await ReportResults(async() => await
_beansList[local].Analyze(new AnalysisResultArgs())));
这与我想象的不太一样,需要很简单,启动一些任务来对一个对象进行操作。每个任务一个唯一的对象。第二部分是每个任务报告结果时的ContinueWith
。但是,我没有得到 WhenAll
类型的行为。希望有人能帮我弄清楚。
_tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
_tasks[loopCnt] = Task<AnalysisResultArgs>.Factory.StartNew(() =>
{
return _beansList[loopCnt].Analyze(newBeanData);
});
await _tasks[loopCnt].ContinueWith(ReportResults,
TaskContinuationOptions.RunContinuationsAsynchronously)
// do some housekeeping when all tasks are complete
}
private void ReportResults(Task<AnalysisResultArgs> task)
{
/* Do some serial operations
}
据我了解,_beansList.Count
任务将启动,并且通过在 ContinueWith
上使用 await
,在所有任务完成之前,内务处理工作不会执行。我无法阻止,因为我需要确保能够限制传入数据以防止等待执行的任务过多。
我哪里搞砸了,等待实际上完成了,内务处理得到 运行,即使不是所有任务都 运行 完成。
您不是在等待所有任务,而是在循环中等待继续。您应该为此使用 Task.WhenAll
方法。另外,如果可以 运行 它在任务中,为什么还需要继续?像这样简化您的代码:
private void ReportResults(AnalysisResultArgs results)
{
/* Do some serial operations */
}
...
_tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
var count = loopCnt;
_tasks[count] = Task.Run(() =>
{
var results = _beansList[count].Analyze(newBeanData);
ReportResults(results);
return results;
});
}
// do some housekeeping when all tasks are complete
await Task.WhenAll(_tasks);
正如@Stephen 已经提到的,问题中的代码不是最小的、完整的和可验证的。我冒昧地做了一些假设,在我看来你的代码应该是这样的:
public async Task<AnalysisResultArgs[]> MainMethod()
{
var _beansList = new List<AnalysisResultArgs>();
for(int i=0; i< 99; i++) // Considering 100 records
_beansList.Add(new AnalysisResultArgs());
var _tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
var local = loopCnt;
_tasks[local] = Task.Run(async() => await ReportResults(_beansList[local].Analyze(new AnalysisResultArgs())));
}
return await Task.WhenAll(_tasks);
}
private async Task<AnalysisResultArgs> ReportResults(Task<AnalysisResultArgs> task)
{
await Task.Delay(1000);
return await Task.FromResult(new AnalysisResultArgs());
}
public class AnalysisResultArgs
{
public async Task<AnalysisResultArgs> Analyze(AnalysisResultArgs newBeanData)
{
await Task.Delay(1000);
return await Task.FromResult(new AnalysisResultArgs());
}
}
假设/其他细节:
Task.Delay
和Task.FromResult
只是实际逻辑的占位符。- 调用
Analyze
和ReportResults
中的所有方法都是异步的 循环代码可能是:
_tasks[local] = ReportResults(_beansList[local].Analyze(new AnalysisResultArgs()));
但如果我们使用 Task.Run
启动 Task
,那么 async, await
有助于释放调用线程池线程/同步上下文。它有助于提高系统的可扩展性。
- 假设
_beansList
为List<AnalysisResultArgs>
类型,但可以根据实际需要修改 ReportResults
可以修改为采用Func<Task<AnalysisResultArgs>> func
,而不是简单的 dTask<AnalysisResultArgs>
,因此在方法内部执行await func()
,然后调用代码可能是真正异步的,如下所示:_tasks[local] = Task.Run(async() => await ReportResults(async() => await _beansList[local].Analyze(new AnalysisResultArgs())));