如何并行执行嵌套 async/await 代码,同时在 await continuations 上维护相同的线程?

How to execute nested async/await code in parallel while maintaining the same thread on await continuations?

这可能是我写过的最糟糕的 Whosebug 标题。我实际上想要做的是执行一个异步方法,该方法使用 async/await 约定(并且它本身包含额外的等待调用)从同步方法中并行多次,同时在每个分支的执行过程中保持相同的线程并行执行,包括所有等待延续。换句话说,我想同步执行一些异步代码,但我想并行执行多次。现在你可以明白为什么标题这么糟糕了。也许这最好用一些代码来说明......

假设我有以下内容:

public class MyAsyncCode
{
    async Task MethodA()
    {
        // Do some stuff...
        await MethodB();
        // Some other stuff
    }

    async Task MethodB()
    {
        // Do some stuff...
        await MethodC();
        // Some other stuff
    }

    async Task MethodC()
    {
        // Do some stuff...
    }
}

调用方是同步的(来自控制台应用程序)。让我尝试通过尝试使用 Task.WaitAll(...) 和包装器任务来说明我正在尝试做什么:

public void MyCallingMethod()
{
    List<Task> tasks = new List<Task>();
    for(int c = 0 ; c < 4 ; c++)
    {
        MyAsyncCode asyncCode = new MyAsyncCode();
        tasks.Add(Task.Run(() => asyncCode.MethodA()));
    }
    Task.WaitAll(tasks.ToArray());
}

期望的行为是 MethodAMethodBMethodC 在继续之前和之后都在同一个线程上 运行,并且对于这在 4 个不同的线程上并行发生 4 次。换句话说,我想删除我的 await 调用的异步行为,因为我正在从调用者并行调用。

现在,在我进一步讨论之前,我明白异步代码和 parallel/multi-threaded 代码之间存在差异,并且前者并不暗示或暗示后者。我也知道实现此行为的最简单方法是删除 async/await 声明。不幸的是,我没有这样做的选项(它在一个库中)并且有 原因 为什么我需要所有的延续都在同一个线程上(必须与 poor所述图书馆的设计)。但更重要的是,这激起了我的兴趣,现在我想从学术角度了解。

我已经尝试 运行 使用 PLINQ 并使用 .AsParallel().Select(x => x.MethodA().Result) 立即执行任务。我还尝试使用随处可见的 AsyncHelper class,它实际上只使用了 .Unwrap().GetAwaiter().GetResult()。我还尝试了其他一些东西,但似乎无法获得所需的行为。我要么以同一线程上的所有调用结束(这显然不是并行的),要么以在不同线程上执行的延续结束。

我尝试做的事情是否可行,或者 async/await 和 TPL 是否差别太大(尽管两者都基于 Tasks)?

你可以创建4个独立的线程,每个线程执行一个有限并发(实际上是完全没有并发)的MethodA TaskScheduler。这将确保线程创建的每个任务和延续任务都将由该线程执行。

    public void MyCallingMethod()
    {
        CancellationToken csl = new CancellationToken();
        var threads = Enumerable.Range(0, 4).Select(p =>
            {
                var t = new Thread(_ =>
                    {
                        Task.Factory.StartNew(() => MethodA(), csl, TaskCreationOptions.None,
                            new LimitedConcurrencyLevelTaskScheduler(1)).Wait();
                    });
                t.Start();  
                return t;
            }).ToArray();
        //You can block the main thread and wait for the other threads here...
    }

当然,这并不能确保您达到 4 级并行度。

您可以在 MSDN 中看到此类 TaskScheduler 的实现 - https://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx

您调用的方法没有使用ConfigureAwait(false)。这意味着我们可以强制继续在我们喜欢的上下文中恢复。选项:

  1. 安装单线程同步上下文。我相信 Nito.Async 有。
  2. 使用自定义 TaskSchedulerawait 查看 TaskScheduler.Current 并在该调度程序处恢复(如果它不是默认的)。

我不确定这两种选择是否各有利弊。我认为选项 2 的范围界定更容易。选项 2 看起来像:

Task.Factory.StartNew(
    () => MethodA()
    , new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler).Unwrap();

为每个并行调用调用一次并使用 Task.WaitAll 加入所有这些任务。也许你也应该处理那个调度程序。

我在这里 (ab) 使用 ConcurrentExclusiveSchedulerPair 来获得单线程调度程序。

如果这些方法不是特别 CPU 密集,您可以对所有方法使用相同的 scheduler/thread。