如何并行执行嵌套 async/await 代码,同时在 await continuations 上维护相同的线程?
How to execute nested async/await code in parallel while maintaining the same thread on await continuations?
这可能是我写过的最糟糕的 Whosebug 标题。我实际上想要做的是执行一个异步方法,该方法使用 async/await 约定(并且它本身包含额外的等待调用)从同步方法中并行多次,同时在每个分支的执行过程中保持相同的线程并行执行,包括所有等待延续。换句话说,我想同步执行一些异步代码,但我想并行执行多次。现在你可以明白为什么标题这么糟糕了。也许这最好用一些代码来说明......
假设我有以下内容:
public class MyAsyncCode
{
async Task MethodA()
{
// Do some stuff...
await MethodB();
// Some other stuff
}
async Task MethodB()
{
// Do some stuff...
await MethodC();
// Some other stuff
}
async Task MethodC()
{
// Do some stuff...
}
}
调用方是同步的(来自控制台应用程序)。让我尝试通过尝试使用 Task.WaitAll(...)
和包装器任务来说明我正在尝试做什么:
public void MyCallingMethod()
{
List<Task> tasks = new List<Task>();
for(int c = 0 ; c < 4 ; c++)
{
MyAsyncCode asyncCode = new MyAsyncCode();
tasks.Add(Task.Run(() => asyncCode.MethodA()));
}
Task.WaitAll(tasks.ToArray());
}
期望的行为是 MethodA
、MethodB
和 MethodC
在继续之前和之后都在同一个线程上 运行,并且对于这在 4 个不同的线程上并行发生 4 次。换句话说,我想删除我的 await
调用的异步行为,因为我正在从调用者并行调用。
现在,在我进一步讨论之前,我明白异步代码和 parallel/multi-threaded 代码之间存在差异,并且前者并不暗示或暗示后者。我也知道实现此行为的最简单方法是删除 async/await 声明。不幸的是,我没有这样做的选项(它在一个库中)并且有 原因 为什么我需要所有的延续都在同一个线程上(必须与 poor所述图书馆的设计)。但更重要的是,这激起了我的兴趣,现在我想从学术角度了解。
我已经尝试 运行 使用 PLINQ 并使用 .AsParallel().Select(x => x.MethodA().Result)
立即执行任务。我还尝试使用随处可见的 AsyncHelper
class,它实际上只使用了 .Unwrap().GetAwaiter().GetResult()
。我还尝试了其他一些东西,但似乎无法获得所需的行为。我要么以同一线程上的所有调用结束(这显然不是并行的),要么以在不同线程上执行的延续结束。
我尝试做的事情是否可行,或者 async/await 和 TPL 是否差别太大(尽管两者都基于 Task
s)?
你可以创建4个独立的线程,每个线程执行一个有限并发(实际上是完全没有并发)的MethodA TaskScheduler。这将确保线程创建的每个任务和延续任务都将由该线程执行。
public void MyCallingMethod()
{
CancellationToken csl = new CancellationToken();
var threads = Enumerable.Range(0, 4).Select(p =>
{
var t = new Thread(_ =>
{
Task.Factory.StartNew(() => MethodA(), csl, TaskCreationOptions.None,
new LimitedConcurrencyLevelTaskScheduler(1)).Wait();
});
t.Start();
return t;
}).ToArray();
//You can block the main thread and wait for the other threads here...
}
当然,这并不能确保您达到 4 级并行度。
您可以在 MSDN 中看到此类 TaskScheduler 的实现 - https://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx
您调用的方法没有使用ConfigureAwait(false)
。这意味着我们可以强制继续在我们喜欢的上下文中恢复。选项:
- 安装单线程同步上下文。我相信 Nito.Async 有。
- 使用自定义
TaskScheduler
。 await
查看 TaskScheduler.Current
并在该调度程序处恢复(如果它不是默认的)。
我不确定这两种选择是否各有利弊。我认为选项 2 的范围界定更容易。选项 2 看起来像:
Task.Factory.StartNew(
() => MethodA()
, new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler).Unwrap();
为每个并行调用调用一次并使用 Task.WaitAll
加入所有这些任务。也许你也应该处理那个调度程序。
我在这里 (ab) 使用 ConcurrentExclusiveSchedulerPair
来获得单线程调度程序。
如果这些方法不是特别 CPU 密集,您可以对所有方法使用相同的 scheduler/thread。
这可能是我写过的最糟糕的 Whosebug 标题。我实际上想要做的是执行一个异步方法,该方法使用 async/await 约定(并且它本身包含额外的等待调用)从同步方法中并行多次,同时在每个分支的执行过程中保持相同的线程并行执行,包括所有等待延续。换句话说,我想同步执行一些异步代码,但我想并行执行多次。现在你可以明白为什么标题这么糟糕了。也许这最好用一些代码来说明......
假设我有以下内容:
public class MyAsyncCode
{
async Task MethodA()
{
// Do some stuff...
await MethodB();
// Some other stuff
}
async Task MethodB()
{
// Do some stuff...
await MethodC();
// Some other stuff
}
async Task MethodC()
{
// Do some stuff...
}
}
调用方是同步的(来自控制台应用程序)。让我尝试通过尝试使用 Task.WaitAll(...)
和包装器任务来说明我正在尝试做什么:
public void MyCallingMethod()
{
List<Task> tasks = new List<Task>();
for(int c = 0 ; c < 4 ; c++)
{
MyAsyncCode asyncCode = new MyAsyncCode();
tasks.Add(Task.Run(() => asyncCode.MethodA()));
}
Task.WaitAll(tasks.ToArray());
}
期望的行为是 MethodA
、MethodB
和 MethodC
在继续之前和之后都在同一个线程上 运行,并且对于这在 4 个不同的线程上并行发生 4 次。换句话说,我想删除我的 await
调用的异步行为,因为我正在从调用者并行调用。
现在,在我进一步讨论之前,我明白异步代码和 parallel/multi-threaded 代码之间存在差异,并且前者并不暗示或暗示后者。我也知道实现此行为的最简单方法是删除 async/await 声明。不幸的是,我没有这样做的选项(它在一个库中)并且有 原因 为什么我需要所有的延续都在同一个线程上(必须与 poor所述图书馆的设计)。但更重要的是,这激起了我的兴趣,现在我想从学术角度了解。
我已经尝试 运行 使用 PLINQ 并使用 .AsParallel().Select(x => x.MethodA().Result)
立即执行任务。我还尝试使用随处可见的 AsyncHelper
class,它实际上只使用了 .Unwrap().GetAwaiter().GetResult()
。我还尝试了其他一些东西,但似乎无法获得所需的行为。我要么以同一线程上的所有调用结束(这显然不是并行的),要么以在不同线程上执行的延续结束。
我尝试做的事情是否可行,或者 async/await 和 TPL 是否差别太大(尽管两者都基于 Task
s)?
你可以创建4个独立的线程,每个线程执行一个有限并发(实际上是完全没有并发)的MethodA TaskScheduler。这将确保线程创建的每个任务和延续任务都将由该线程执行。
public void MyCallingMethod()
{
CancellationToken csl = new CancellationToken();
var threads = Enumerable.Range(0, 4).Select(p =>
{
var t = new Thread(_ =>
{
Task.Factory.StartNew(() => MethodA(), csl, TaskCreationOptions.None,
new LimitedConcurrencyLevelTaskScheduler(1)).Wait();
});
t.Start();
return t;
}).ToArray();
//You can block the main thread and wait for the other threads here...
}
当然,这并不能确保您达到 4 级并行度。
您可以在 MSDN 中看到此类 TaskScheduler 的实现 - https://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx
您调用的方法没有使用ConfigureAwait(false)
。这意味着我们可以强制继续在我们喜欢的上下文中恢复。选项:
- 安装单线程同步上下文。我相信 Nito.Async 有。
- 使用自定义
TaskScheduler
。await
查看TaskScheduler.Current
并在该调度程序处恢复(如果它不是默认的)。
我不确定这两种选择是否各有利弊。我认为选项 2 的范围界定更容易。选项 2 看起来像:
Task.Factory.StartNew(
() => MethodA()
, new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler).Unwrap();
为每个并行调用调用一次并使用 Task.WaitAll
加入所有这些任务。也许你也应该处理那个调度程序。
我在这里 (ab) 使用 ConcurrentExclusiveSchedulerPair
来获得单线程调度程序。
如果这些方法不是特别 CPU 密集,您可以对所有方法使用相同的 scheduler/thread。