运行 以安全的方式在企业应用程序中多重任务<>

Running multipe Task<> in an enterprise application in a safe way

我正在为可以实例化一系列 "agents" 做一些有用的事情的产品设计软件架构。 假设每个代理实现一个具有以下功能的接口:

Task AsyncRun(CancellationToken token)

因为这些代理正在做很多 I/O,因此具有 async 功能可能有些意义。此外,如果没有异常或明确取消发生,AsyncRun 应该永远不会完成。

现在的问题是:主程序必须 运行 在多个代理上执行此操作,我想知道 运行 宁多任务的正确方法,向每个单独的完成发出信号(即由于 cancellation/errors ): 例如,我正在考虑像这样的无限循环

//.... all task cretaed are in the array tasks..
while(true)
{
     await Task.WhenAny(tasks)
     //.... check each single task for understand which one(s) exited
     // re-run the task if requested replacing in the array tasks
}

但不确定它是否正确(甚至是最佳方式) 此外,我想知道这是否是正确的模式,特别是因为实施者可能不匹配 RunAsync 并进行阻塞调用,在这种情况下整个应用程序将挂起。

为了知道任务是成功完成,还是被取消或出错,您可以使用延续。一旦任务完成,无论是因为失败、取消还是完成,都会立即调用延续。 :

using (var tokenSource = new CancellationTokenSource())
{
    IEnumerable<IAgent> agents; // TODO: initialize

    var tasks = new List<Task>();
    foreach (var agent in agents)
    {
        var task = agent.RunAsync(tokenSource.Token)
            .ContinueWith(t =>
            {
                if (t.IsCanceled)
                {
                    // Do something if cancelled.
                }
                else if (t.IsFaulted)
                {
                    // Do something if faulted (with t.Exception)
                }
                else
                {
                    // Do something if the task has completed.
                }
            });

        tasks.Add(task);
    }

    await Task.WhenAll(tasks);
}

最后你会等待继续的任务。另见 this answer.

如果您担心 IAgent 实现会创建阻塞调用并希望防止应用程序挂起,您可以将对异步方法的调用包装在 Task.Run 中。这样对代理的调用是在线程池上执行的,因此是 non-blocking:

var task = Task.Run(async () =>
    await agent.RunAsync(tokenSource.Token)
        .ContinueWith(t =>
        {
            // Same as above
        }));

例如,您可能想使用 Task.Factory.StartNew 来将任务标记为长时间运行。

// re-run the task if requested replacing in the array tasks

这是我考虑更改的第一件事。 不要 让应用程序处理它自己的 "restarting" 要好得多。如果操作失败,则无法保证应用程序可以恢复。对于任何 language/runtime.

中的任何类型的操作都是如此

一个更好的解决方案是让另一个应用程序重新启动这个。允许异常传播(如果可能,记录它),并允许它终止应用程序。然后根据需要重新启动 "manager" 进程(实际上是一个单独的可执行进程)。这是所有现代 high-availability 系统的工作方式,从 Win32 服务管理器到 ASP.NET,再到 Kubernetes 容器管理器,再到 Azure Functions 运行时。

请注意,如果您确实想走这条路,将任务拆分到不同的进程可能是有意义的,这样它们就可以独立重新启动。这样一个人重启就不会导致其他人重启。

但是,如果您想将所有任务都放在同一个进程中,那么您的解决方案就可以了。如果您在流程开始时有已知数量的任务,并且该数量不会改变(除非它们失败),那么您可以通过排除重新启动并使用 Task.WhenAll 而不是来简化代码Task.WhenAny:

async Task RunAsync(Func<CancellationToken, Task> work, CancellationToken token)
{
  while (true)
  {
    try { await work(token); }
    catch
    {
      // log...
    }

    if (we-should-not-restart)
      break;
  }
}

List<Func<CancellationToken, Task>> workToDo = ...;
var tasks = workToDo.Select(work => RunAsync(work, token));
await Task.WhenAll(tasks);
// Only gets here if they all complete/fail and were not restarted.

the implementer can mismatch the RunAsync and do a blocking call, in which case the entire application will hang.

防止这种情况的最佳方法是将调用包装在 Task.Run 中,因此:

await work(token);

变成这样:

await Task.Run(() => work(token));