运行 以安全的方式在企业应用程序中多重任务<>
Running multipe Task<> in an enterprise application in a safe way
我正在为可以实例化一系列 "agents" 做一些有用的事情的产品设计软件架构。
假设每个代理实现一个具有以下功能的接口:
Task AsyncRun(CancellationToken token)
因为这些代理正在做很多 I/O,因此具有 async
功能可能有些意义。此外,如果没有异常或明确取消发生,AsyncRun 应该永远不会完成。
现在的问题是:主程序必须 运行 在多个代理上执行此操作,我想知道 运行 宁多任务的正确方法,向每个单独的完成发出信号(即由于 cancellation/errors ):
例如,我正在考虑像这样的无限循环
//.... all task cretaed are in the array tasks..
while(true)
{
await Task.WhenAny(tasks)
//.... check each single task for understand which one(s) exited
// re-run the task if requested replacing in the array tasks
}
但不确定它是否正确(甚至是最佳方式)
此外,我想知道这是否是正确的模式,特别是因为实施者可能不匹配 RunAsync
并进行阻塞调用,在这种情况下整个应用程序将挂起。
为了知道任务是成功完成,还是被取消或出错,您可以使用延续。一旦任务完成,无论是因为失败、取消还是完成,都会立即调用延续。 :
using (var tokenSource = new CancellationTokenSource())
{
IEnumerable<IAgent> agents; // TODO: initialize
var tasks = new List<Task>();
foreach (var agent in agents)
{
var task = agent.RunAsync(tokenSource.Token)
.ContinueWith(t =>
{
if (t.IsCanceled)
{
// Do something if cancelled.
}
else if (t.IsFaulted)
{
// Do something if faulted (with t.Exception)
}
else
{
// Do something if the task has completed.
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
最后你会等待继续的任务。另见 this answer.
如果您担心 IAgent
实现会创建阻塞调用并希望防止应用程序挂起,您可以将对异步方法的调用包装在 Task.Run
中。这样对代理的调用是在线程池上执行的,因此是 non-blocking:
var task = Task.Run(async () =>
await agent.RunAsync(tokenSource.Token)
.ContinueWith(t =>
{
// Same as above
}));
例如,您可能想使用 Task.Factory.StartNew
来将任务标记为长时间运行。
// re-run the task if requested replacing in the array tasks
这是我考虑更改的第一件事。 不要 让应用程序处理它自己的 "restarting" 要好得多。如果操作失败,则无法保证应用程序可以恢复。对于任何 language/runtime.
中的任何类型的操作都是如此
一个更好的解决方案是让另一个应用程序重新启动这个。允许异常传播(如果可能,记录它),并允许它终止应用程序。然后根据需要重新启动 "manager" 进程(实际上是一个单独的可执行进程)。这是所有现代 high-availability 系统的工作方式,从 Win32 服务管理器到 ASP.NET,再到 Kubernetes 容器管理器,再到 Azure Functions 运行时。
请注意,如果您确实想走这条路,将任务拆分到不同的进程可能是有意义的,这样它们就可以独立重新启动。这样一个人重启就不会导致其他人重启。
但是,如果您想将所有任务都放在同一个进程中,那么您的解决方案就可以了。如果您在流程开始时有已知数量的任务,并且该数量不会改变(除非它们失败),那么您可以通过排除重新启动并使用 Task.WhenAll
而不是来简化代码Task.WhenAny
:
async Task RunAsync(Func<CancellationToken, Task> work, CancellationToken token)
{
while (true)
{
try { await work(token); }
catch
{
// log...
}
if (we-should-not-restart)
break;
}
}
List<Func<CancellationToken, Task>> workToDo = ...;
var tasks = workToDo.Select(work => RunAsync(work, token));
await Task.WhenAll(tasks);
// Only gets here if they all complete/fail and were not restarted.
the implementer can mismatch the RunAsync and do a blocking call, in which case the entire application will hang.
防止这种情况的最佳方法是将调用包装在 Task.Run
中,因此:
await work(token);
变成这样:
await Task.Run(() => work(token));
我正在为可以实例化一系列 "agents" 做一些有用的事情的产品设计软件架构。 假设每个代理实现一个具有以下功能的接口:
Task AsyncRun(CancellationToken token)
因为这些代理正在做很多 I/O,因此具有 async
功能可能有些意义。此外,如果没有异常或明确取消发生,AsyncRun 应该永远不会完成。
现在的问题是:主程序必须 运行 在多个代理上执行此操作,我想知道 运行 宁多任务的正确方法,向每个单独的完成发出信号(即由于 cancellation/errors ): 例如,我正在考虑像这样的无限循环
//.... all task cretaed are in the array tasks..
while(true)
{
await Task.WhenAny(tasks)
//.... check each single task for understand which one(s) exited
// re-run the task if requested replacing in the array tasks
}
但不确定它是否正确(甚至是最佳方式)
此外,我想知道这是否是正确的模式,特别是因为实施者可能不匹配 RunAsync
并进行阻塞调用,在这种情况下整个应用程序将挂起。
为了知道任务是成功完成,还是被取消或出错,您可以使用延续。一旦任务完成,无论是因为失败、取消还是完成,都会立即调用延续。 :
using (var tokenSource = new CancellationTokenSource())
{
IEnumerable<IAgent> agents; // TODO: initialize
var tasks = new List<Task>();
foreach (var agent in agents)
{
var task = agent.RunAsync(tokenSource.Token)
.ContinueWith(t =>
{
if (t.IsCanceled)
{
// Do something if cancelled.
}
else if (t.IsFaulted)
{
// Do something if faulted (with t.Exception)
}
else
{
// Do something if the task has completed.
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
最后你会等待继续的任务。另见 this answer.
如果您担心 IAgent
实现会创建阻塞调用并希望防止应用程序挂起,您可以将对异步方法的调用包装在 Task.Run
中。这样对代理的调用是在线程池上执行的,因此是 non-blocking:
var task = Task.Run(async () =>
await agent.RunAsync(tokenSource.Token)
.ContinueWith(t =>
{
// Same as above
}));
例如,您可能想使用 Task.Factory.StartNew
来将任务标记为长时间运行。
// re-run the task if requested replacing in the array tasks
这是我考虑更改的第一件事。 不要 让应用程序处理它自己的 "restarting" 要好得多。如果操作失败,则无法保证应用程序可以恢复。对于任何 language/runtime.
中的任何类型的操作都是如此一个更好的解决方案是让另一个应用程序重新启动这个。允许异常传播(如果可能,记录它),并允许它终止应用程序。然后根据需要重新启动 "manager" 进程(实际上是一个单独的可执行进程)。这是所有现代 high-availability 系统的工作方式,从 Win32 服务管理器到 ASP.NET,再到 Kubernetes 容器管理器,再到 Azure Functions 运行时。
请注意,如果您确实想走这条路,将任务拆分到不同的进程可能是有意义的,这样它们就可以独立重新启动。这样一个人重启就不会导致其他人重启。
但是,如果您想将所有任务都放在同一个进程中,那么您的解决方案就可以了。如果您在流程开始时有已知数量的任务,并且该数量不会改变(除非它们失败),那么您可以通过排除重新启动并使用 Task.WhenAll
而不是来简化代码Task.WhenAny
:
async Task RunAsync(Func<CancellationToken, Task> work, CancellationToken token)
{
while (true)
{
try { await work(token); }
catch
{
// log...
}
if (we-should-not-restart)
break;
}
}
List<Func<CancellationToken, Task>> workToDo = ...;
var tasks = workToDo.Select(work => RunAsync(work, token));
await Task.WhenAll(tasks);
// Only gets here if they all complete/fail and were not restarted.
the implementer can mismatch the RunAsync and do a blocking call, in which case the entire application will hang.
防止这种情况的最佳方法是将调用包装在 Task.Run
中,因此:
await work(token);
变成这样:
await Task.Run(() => work(token));