在 .NET 中将 Task.WaitAll() 用于服务时出现问题
Issue using Task.WaitAll() for services in .NET
我有一个使用以下 OnStart() 方法的服务:
protected override void OnStart(string[] args)
{
this.manager.StartManager();
this.log.LogEvent(this.id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Service started"));
}
和 OnStop() 方法:
protected override void OnStop()
{
this.manager.CancellationTokenSource.Cancel();
this.log.LogEvent(this.id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Service stopped"));
}
StartManager() 方法实现如下:
public void StartManager()
{
foreach (var reportGeneratorThread in this.ReportGenerators)
{
reportGeneratorThread.Start();
Thread.Sleep(1000);
}
try
{
Task.WaitAll(this.Tasks.ToArray());
}
catch (AggregateException e)
{
foreach (var v in e.InnerExceptions)
{
var taskException = v as TaskCanceledException;
if (v != taskException)
{
foreach (var reportGenerator in this.ReportGenerators)
{
if (reportGenerator.Task.IsFaulted)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Unhandled exception from Task " + reportGenerator.Task.Id));
ReportGeneratorThread faultedReportGeneratorThread = this.GetThreadById(reportGenerator.Task.Id);
var index = this.ReportGenerators.FindIndex(t => t.Equals(faultedReportGeneratorThread));
this.DisposeFaultedThread(faultedReportGeneratorThread, index);
this.ReportGenerators[index].Start();
this.logger.LogDebug(string.Format(CultureInfo.InvariantCulture, "Faulted Task, and instance of ReportGeneratorThread is recreated and corresponding task is started"));
break;
}
}
}
else if (taskException != null)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Task " + taskException.Task.Id + " has thrown a Task Canceled Exception"));
}
}
}
}
问题出现在我的 StartManager() 方法中,因为 reportGeneratorThread.Start() 方法正在启动一个在 while 循环中连续生成报告的任务,该任务只能在抛出取消标记后中止我将它放入我的服务 OnStop() 方法中。
因此,当我测试我的服务时,程序无法超出 Task.WaitAll(),这会阻止我完成 OnStart() 方法,并且我收到以下错误:
Error 1053: the service did not respond to the start or control request in a timely fashion
我仍然需要管理我的任务,所以我实际上需要 Task.WaitAll() 方法,但我也需要解决这个问题。在这种情况下如何完成 OnStart() 方法?在不改变我的代码结构的情况下,最好的方法是什么。
添加我的代码的更多部分:
这是我的任务调用的方法:
private void DoWork()
{
while (this.Running)
{
this.GenerateReport();
Thread.Sleep(Settings.Default.DefaultSleepDelay);
}
this.log.LogDebug(string.Format(CultureInfo.InvariantCulture, "Worker thread stopping."));
}
和 GenerateReport() 方法:如果服务请求取消,我将调用 Stop() 方法。此方法抛出 TaskCancelledException。
public void GenerateReport()
{
if (this.cancellationToken.IsCancellationRequested)
{
this.Stop();
}
var didwork = false;
try
{
didwork = this.reportGenerator.GenerateReport(this.getPermission, this.TaskId);
}
catch (Exception e)
{
this.log.LogError(ReportGenerator.CorrelationIdForPickingReport, string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Error during report generation."), 0, e);
}
finally
{
if (!didwork)
{
Thread.Sleep(Settings.Default.ReportGenerationInterval);
}
}
}
如果没有 a good, minimal, complete code example 清楚地说明您的问题,则很难准确理解代码的作用。至少有一些我立即看到的奇怪之处:
- 您的方法旨在重新启动报告生成器线程。但据我所知,它只能重启 one。如果您以后有更多错误,没有代码等待检测并处理它。
- 使用
WaitAll()
将阻止代码检测到一个故障,直到 所有 任务完成。因此,它们要么都需要在任何重新启动之前出现故障,要么直到您真正停止服务并取消任务时才会检测到故障。
- 只有当
taskException
是 null
时 v != taskException
才会成立。所以在我看来,稍后检查它是否为非空是没有意义的。
- 我看不出在开始每个任务之间睡 1 秒钟有什么意义。
所以我不确定是否可以知道修复所有这些代码的最佳方法是什么。也就是说,眼前的问题似乎很明确:根据设计,您的 OnStart()
方法需要及时 return,但您当前的实现无法做到这一点。这个基本问题似乎可以解决,方法是将 StartManager()
方法设为 async
方法,并使用 await
到 return 控制调用方,直到发生有趣的事情。这可能看起来像这样:
public async Task StartManager()
{
foreach (var reportGeneratorThread in this.ReportGenerators)
{
reportGeneratorThread.Start();
Thread.Sleep(1000);
}
try
{
await Task.WhenAll(this.Tasks.ToArray());
}
catch (AggregateException e)
{
foreach (var v in e.InnerExceptions)
{
var taskException = v as TaskCanceledException;
if (v != taskException)
{
foreach (var reportGenerator in this.ReportGenerators)
{
if (reportGenerator.Task.IsFaulted)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Unhandled exception from Task " + reportGenerator.Task.Id));
ReportGeneratorThread faultedReportGeneratorThread = this.GetThreadById(reportGenerator.Task.Id);
var index = this.ReportGenerators.FindIndex(t => t.Equals(faultedReportGeneratorThread));
this.DisposeFaultedThread(faultedReportGeneratorThread, index);
this.ReportGenerators[index].Start();
this.logger.LogDebug(string.Format(CultureInfo.InvariantCulture, "Faulted Task, and instance of ReportGeneratorThread is recreated and corresponding task is started"));
break;
}
}
}
else if (taskException != null)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Task " + taskException.Task.Id + " has thrown a Task Canceled Exception"));
}
}
}
}
然后你可以从 OnStart()
调用它,像这样:
protected override void OnStart(string[] args)
{
// Save the returned Task in a local, just as a hack
// to suppress the compiler warning about not awaiting the call.
// Alternatively, store the Task object somewhere and actually
// do something useful with it.
var _ = this.manager.StartManager();
this.log.LogEvent(this.id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Service started"));
}
请注意,上面提议的更改无法解决我提到的任何异常情况。在我看来,该方法更有用的实现可能如下所示:
public async Task StartManager()
{
foreach (var reportGeneratorThread in this.ReportGenerators)
{
reportGeneratorThread.Start();
}
while (true)
{
try
{
Task task = await Task.WhenAny(this.Tasks.ToArray());
if (task.IsFaulted)
{
// Unpack the exception. Alternatively, you could just retrieve the
// AggregateException directly from task.Exception and process it
// exactly as in the original code (i.e. enumerate the
// AggregateException.InnerExceptions collection). Note that in
// that case, you will see only a single exception in the
// InnerExceptions collection. To detect exceptions in additional
// tasks, you would need to await them as well. Fortunately,
// this will happen each time you loop back and call Task.WhenAny()
// again, since all the tasks are in the Tasks collection being
// passed to WhenAny().
await task;
}
}
catch (Exception v)
{
var taskException = v as TaskCanceledException;
if (v != taskException)
{
foreach (var reportGenerator in this.ReportGenerators)
{
if (reportGenerator.Task.IsFaulted)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Unhandled exception from Task " + reportGenerator.Task.Id));
ReportGeneratorThread faultedReportGeneratorThread = this.GetThreadById(reportGenerator.Task.Id);
var index = this.ReportGenerators.FindIndex(t => t.Equals(faultedReportGeneratorThread));
this.DisposeFaultedThread(faultedReportGeneratorThread, index);
this.ReportGenerators[index].Start();
this.logger.LogDebug(string.Format(CultureInfo.InvariantCulture, "Faulted Task, and instance of ReportGeneratorThread is recreated and corresponding task is started"));
break;
}
}
}
else
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Task " + taskException.Task.Id + " has thrown a Task Canceled Exception"));
// Cancelling tasks...time to exit
return;
}
}
}
}
以上将循环,出现错误的任务时立即重新启动,但如果取消一个任务则完全退出。
注意: 缺少好的代码示例,以上是浏览器代码:完全未编译、未经测试。我不记得异常是如何从 WhenAll()
和 WhenAny()
传播的细节......我想我的例子是正确的,但你完全有可能需要调整它工作的细节。我希望至少以有用的方式表达了基本思想。
我有一个使用以下 OnStart() 方法的服务:
protected override void OnStart(string[] args)
{
this.manager.StartManager();
this.log.LogEvent(this.id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Service started"));
}
和 OnStop() 方法:
protected override void OnStop()
{
this.manager.CancellationTokenSource.Cancel();
this.log.LogEvent(this.id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Service stopped"));
}
StartManager() 方法实现如下:
public void StartManager()
{
foreach (var reportGeneratorThread in this.ReportGenerators)
{
reportGeneratorThread.Start();
Thread.Sleep(1000);
}
try
{
Task.WaitAll(this.Tasks.ToArray());
}
catch (AggregateException e)
{
foreach (var v in e.InnerExceptions)
{
var taskException = v as TaskCanceledException;
if (v != taskException)
{
foreach (var reportGenerator in this.ReportGenerators)
{
if (reportGenerator.Task.IsFaulted)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Unhandled exception from Task " + reportGenerator.Task.Id));
ReportGeneratorThread faultedReportGeneratorThread = this.GetThreadById(reportGenerator.Task.Id);
var index = this.ReportGenerators.FindIndex(t => t.Equals(faultedReportGeneratorThread));
this.DisposeFaultedThread(faultedReportGeneratorThread, index);
this.ReportGenerators[index].Start();
this.logger.LogDebug(string.Format(CultureInfo.InvariantCulture, "Faulted Task, and instance of ReportGeneratorThread is recreated and corresponding task is started"));
break;
}
}
}
else if (taskException != null)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Task " + taskException.Task.Id + " has thrown a Task Canceled Exception"));
}
}
}
}
问题出现在我的 StartManager() 方法中,因为 reportGeneratorThread.Start() 方法正在启动一个在 while 循环中连续生成报告的任务,该任务只能在抛出取消标记后中止我将它放入我的服务 OnStop() 方法中。 因此,当我测试我的服务时,程序无法超出 Task.WaitAll(),这会阻止我完成 OnStart() 方法,并且我收到以下错误:
Error 1053: the service did not respond to the start or control request in a timely fashion
我仍然需要管理我的任务,所以我实际上需要 Task.WaitAll() 方法,但我也需要解决这个问题。在这种情况下如何完成 OnStart() 方法?在不改变我的代码结构的情况下,最好的方法是什么。
添加我的代码的更多部分:
这是我的任务调用的方法:
private void DoWork()
{
while (this.Running)
{
this.GenerateReport();
Thread.Sleep(Settings.Default.DefaultSleepDelay);
}
this.log.LogDebug(string.Format(CultureInfo.InvariantCulture, "Worker thread stopping."));
}
和 GenerateReport() 方法:如果服务请求取消,我将调用 Stop() 方法。此方法抛出 TaskCancelledException。
public void GenerateReport()
{
if (this.cancellationToken.IsCancellationRequested)
{
this.Stop();
}
var didwork = false;
try
{
didwork = this.reportGenerator.GenerateReport(this.getPermission, this.TaskId);
}
catch (Exception e)
{
this.log.LogError(ReportGenerator.CorrelationIdForPickingReport, string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Error during report generation."), 0, e);
}
finally
{
if (!didwork)
{
Thread.Sleep(Settings.Default.ReportGenerationInterval);
}
}
}
如果没有 a good, minimal, complete code example 清楚地说明您的问题,则很难准确理解代码的作用。至少有一些我立即看到的奇怪之处:
- 您的方法旨在重新启动报告生成器线程。但据我所知,它只能重启 one。如果您以后有更多错误,没有代码等待检测并处理它。
- 使用
WaitAll()
将阻止代码检测到一个故障,直到 所有 任务完成。因此,它们要么都需要在任何重新启动之前出现故障,要么直到您真正停止服务并取消任务时才会检测到故障。 - 只有当
taskException
是null
时v != taskException
才会成立。所以在我看来,稍后检查它是否为非空是没有意义的。 - 我看不出在开始每个任务之间睡 1 秒钟有什么意义。
所以我不确定是否可以知道修复所有这些代码的最佳方法是什么。也就是说,眼前的问题似乎很明确:根据设计,您的 OnStart()
方法需要及时 return,但您当前的实现无法做到这一点。这个基本问题似乎可以解决,方法是将 StartManager()
方法设为 async
方法,并使用 await
到 return 控制调用方,直到发生有趣的事情。这可能看起来像这样:
public async Task StartManager()
{
foreach (var reportGeneratorThread in this.ReportGenerators)
{
reportGeneratorThread.Start();
Thread.Sleep(1000);
}
try
{
await Task.WhenAll(this.Tasks.ToArray());
}
catch (AggregateException e)
{
foreach (var v in e.InnerExceptions)
{
var taskException = v as TaskCanceledException;
if (v != taskException)
{
foreach (var reportGenerator in this.ReportGenerators)
{
if (reportGenerator.Task.IsFaulted)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Unhandled exception from Task " + reportGenerator.Task.Id));
ReportGeneratorThread faultedReportGeneratorThread = this.GetThreadById(reportGenerator.Task.Id);
var index = this.ReportGenerators.FindIndex(t => t.Equals(faultedReportGeneratorThread));
this.DisposeFaultedThread(faultedReportGeneratorThread, index);
this.ReportGenerators[index].Start();
this.logger.LogDebug(string.Format(CultureInfo.InvariantCulture, "Faulted Task, and instance of ReportGeneratorThread is recreated and corresponding task is started"));
break;
}
}
}
else if (taskException != null)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Task " + taskException.Task.Id + " has thrown a Task Canceled Exception"));
}
}
}
}
然后你可以从 OnStart()
调用它,像这样:
protected override void OnStart(string[] args)
{
// Save the returned Task in a local, just as a hack
// to suppress the compiler warning about not awaiting the call.
// Alternatively, store the Task object somewhere and actually
// do something useful with it.
var _ = this.manager.StartManager();
this.log.LogEvent(this.id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Service started"));
}
请注意,上面提议的更改无法解决我提到的任何异常情况。在我看来,该方法更有用的实现可能如下所示:
public async Task StartManager()
{
foreach (var reportGeneratorThread in this.ReportGenerators)
{
reportGeneratorThread.Start();
}
while (true)
{
try
{
Task task = await Task.WhenAny(this.Tasks.ToArray());
if (task.IsFaulted)
{
// Unpack the exception. Alternatively, you could just retrieve the
// AggregateException directly from task.Exception and process it
// exactly as in the original code (i.e. enumerate the
// AggregateException.InnerExceptions collection). Note that in
// that case, you will see only a single exception in the
// InnerExceptions collection. To detect exceptions in additional
// tasks, you would need to await them as well. Fortunately,
// this will happen each time you loop back and call Task.WhenAny()
// again, since all the tasks are in the Tasks collection being
// passed to WhenAny().
await task;
}
}
catch (Exception v)
{
var taskException = v as TaskCanceledException;
if (v != taskException)
{
foreach (var reportGenerator in this.ReportGenerators)
{
if (reportGenerator.Task.IsFaulted)
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Unhandled exception from Task " + reportGenerator.Task.Id));
ReportGeneratorThread faultedReportGeneratorThread = this.GetThreadById(reportGenerator.Task.Id);
var index = this.ReportGenerators.FindIndex(t => t.Equals(faultedReportGeneratorThread));
this.DisposeFaultedThread(faultedReportGeneratorThread, index);
this.ReportGenerators[index].Start();
this.logger.LogDebug(string.Format(CultureInfo.InvariantCulture, "Faulted Task, and instance of ReportGeneratorThread is recreated and corresponding task is started"));
break;
}
}
}
else
{
this.logger.LogEvent(this.Id.ToString(), string.Format(CultureInfo.InvariantCulture, "System"), string.Format(CultureInfo.InvariantCulture, "Task " + taskException.Task.Id + " has thrown a Task Canceled Exception"));
// Cancelling tasks...time to exit
return;
}
}
}
}
以上将循环,出现错误的任务时立即重新启动,但如果取消一个任务则完全退出。
注意: 缺少好的代码示例,以上是浏览器代码:完全未编译、未经测试。我不记得异常是如何从 WhenAll()
和 WhenAny()
传播的细节......我想我的例子是正确的,但你完全有可能需要调整它工作的细节。我希望至少以有用的方式表达了基本思想。