在 C#.NET 中将任意数量的任务链接在一起

Chaining arbitrary number of tasks together in C#.NET

我有什么

我有一套异步处理方法,类似于:

public class AsyncProcessor<T>
{
    //...rest of members, etc.

    public Task Process(T input)
    {
        //Some special processing, most likely inside a Task, so
        //maybe spawn a new Task, etc.
        Task task = Task.Run(/* maybe private method that does the processing*/);
        return task;
    }
}

我想要的

我想将它们全部链接在一起,以便按顺序执行

我试过的

我已尝试执行以下操作:

public class CompositeAsyncProcessor<T>
{
    private readonly IEnumerable<AsyncProcessor<T>> m_processors;

    //Constructor receives the IEnumerable<AsyncProcessor<T>> and
    //stores it in the field above.

    public Task ProcessInput(T input)
    {
        Task chainedTask = Task.CompletedTask;

        foreach (AsyncProcessor<T> processor in m_processors)
        {
            chainedTask = chainedTask.ContinueWith(t => processor.Process(input));
        }

        return chainedTask;
    }
}

出了什么问题

但是,任务没有运行顺序,因为据我了解,在对ContinueWith的调用中,processor.Process(input) 调用立即执行,方法 returns 与返回任务的状态无关。因此,所有处理任务仍然几乎同时开始。

我的问题

我的问题是我是否可以做一些优雅的事情来按顺序 链接任务(即没有执行重叠)。例如,我可以使用以下语句(我对细节有点费劲)来实现这一点吗?

chainedTask = chainedTask.ContinueWith(async t => await processor.Process(input));

此外,如果不使用 async/await,我如何做到这一点 ,仅使用 ContinueWith

我为什么要这样做?

因为我的 Processor 对象可以访问“线程不安全”资源,并可以从中请求资源。 另外,我不能只等待所有方法,因为我不知道它们有多少,所以我不能只写下必要的行代码。

线程不安全是什么意思?具体问题

因为我可能用错了这个术语,所以插图更好地解释了这一点。在我的 Processor 对象使用的 "resources" 中,它们都可以访问如下对象:

public interface IRepository
{
    void Add(object obj);

    bool Remove(object obj);

    IEnumerable<object> Items { get; }
}

目前使用的实现方式比较幼稚。所以一些 Processor 对象添加东西,而其他人检索 Items 进行检查。当然,我经常遇到的例外之一是:

InvalidOperationException: Collection was modified, enumeration operation may not execute.

我可以花一些时间锁定访问权限并预先运行枚举。然而,这是我要考虑的第二个选项,而我的第一个想法是按顺序 运行 进行处理。

为什么我必须使用任务?

虽然我在 this 案例中拥有完全控制权,但我可以说出于问题的目的,我可能无法更改基本实现,所以会发生什么如果我被任务困住了? 此外,实际上的操作代表了相对耗时的CPU绑定操作加上我正在尝试实现响应式用户界面所以我需要减轻一些异步操作的负担。虽然很有用,而且在我的大多数用例中,不需要链接多个,而是每次链接一个(或一对,但总是特定的和特定的数量,所以我能够挂钩它们没有迭代和async/await),其中一个用例最终需要将未知数量的任务链接在一起。

我目前是如何处理的

我目前处理这个问题的方法是在 ContinueWith 调用中附加对 Wait() 的调用,即:

foreach (AsyncProcessor<T> processor in m_processors)
{
    chainedTask = chainedTask.ContinueWith(t => processor.Process(input).Wait());
}

我将不胜感激关于我应该如何做到这一点,或者我如何能够更优雅地做到这一点(或者,"async-properly",可以这么说)。另外,我想知道如何在没有 async/await.

的情况下

为什么我的问题与 不同, 没有完全回答我的问题。

因为链接的问题有 两个 任务,所以解决方案是简单地写 两个 行,而我有一个任意(和未知的)任务数量,所以我需要一个合适的迭代。另外,我的方法是 而不是异步 。我现在明白(从一个简单可用的答案,已被删除)如果我将我的方法更改为 asyncawait 我可以很容易地做到这一点处理器的 Task 方法,但我仍然想知道如何在没有 async/await 语法的情况下实现这一点。

为什么我的问题不是其他链接问题的重复

因为其中 none 解释了如何使用 ContinueWith 正确链接并且我对利用 ContinueWith 不使用的解决方案感兴趣async/await 模式。我知道这种模式可能是更好的解决方案,我想了解如何(如果可能)正确使用 ContinueWith 调用进行任意链接。我现在知道我 don't need ContinueWith. The question is, how do I do it with ContinueWith?

foreach + await 将依次 运行 Process

    public async Task ProcessInputAsync(T input)
    {
        foreach (var processor in m_processors)
        {
            await processor.Process(input));
        }
    }

顺便说一句。 Process,应该叫ProcessAsync

方法 Task.ContinueWith does not understand async delegates, like Task.Run do, so when you return a Task it considers this as a normal return value and wraps it in another Task. So you end up receiving a Task<Task> instead of what you expected to get. The problem would be obvious if the AsyncProcessor.Process was returning a generic Task<T>. In this case you would get a compile error because of the illegal casting from Task<Task<T>> to Task<T>. In your case you cast from Task<Task> to Task, which is legal, since Task<TResult> 派生自 Task

解决问题很简单。您只需要将 Task<Task> 解包为一个简单的 Task,并且有一个内置方法 Unwrap 可以做到这一点。

还有一个问题需要您解决。目前,您的代码抑制了每个人 AsyncProcessor.Process 可能发生的所有异常,我认为这不是故意的。因此,您必须决定在这种情况下要遵循哪种策略。您是要立即传播第一个异常,还是更愿意将它们全部缓存起来并在最后捆绑在 AggregateException, like the Task.WhenAll 中传播?下面的例子实现了第一个策略。

public class CompositeAsyncProcessor<T>
{
    //...
    public Task Process(T input)
    {
        Task current = Task.CompletedTask;
        foreach (AsyncProcessor<T> processor in m_processors)
        {
            current = current.ContinueWith(antecessor =>
            {
                if (antecessor.IsFaulted)
                    return Task.FromException<T>(antecessor.Exception.InnerException);
                return processor.Process(input);
            },
                CancellationToken.None,
                TaskContinuationOptions.ExecuteSynchronously,
                TaskScheduler.Default
            ).Unwrap();
        }
        return current;
    }
}

我使用了 ContinueWith 的重载,允许配置所有选项,因为默认值并不理想。默认的 TaskContinuationOptionsNone。将它配置为 ExecuteSynchronously 可以最大限度地减少线程切换,因为每个延续都将 运行 在完成前一个线程的同一线程中。

默认任务计划程序是 TaskScheduler.Current。通过指定 TaskScheduler.Default,您可以明确表示您希望线程池线程中的延续 运行(对于某些无法同步 运行 的异常情况)。 TaskScheduler.Current 是特定于上下文的,如果它让您感到惊讶,那将不是一个好方法。

如您所见,老式 ContinueWith 方法存在很多问题。在循环中使用现代 await 更容易实现,并且更难出错。