在实施 producer/consumer 模式时使用 Task.Yield 克服线程池饥饿
Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern
回答问题:Task.Yield - real usages?
我建议使用 Task.Yield 允许池线程被其他任务重用。在这样的模式中:
CancellationTokenSource cts;
void Start()
{
cts = new CancellationTokenSource();
// run async operation
var task = Task.Run(() => SomeWork(cts.Token), cts.Token);
// wait for completion
// after the completion handle the result/ cancellation/ errors
}
async Task<int> SomeWork(CancellationToken cancellationToken)
{
int result = 0;
bool loopAgain = true;
while (loopAgain)
{
// do something ... means a substantial work or a micro batch here - not processing a single byte
loopAgain = /* check for loop end && */ cancellationToken.IsCancellationRequested;
if (loopAgain) {
// reschedule the task to the threadpool and free this thread for other waiting tasks
await Task.Yield();
}
}
cancellationToken.ThrowIfCancellationRequested();
return result;
}
void Cancel()
{
// request cancelation
cts.Cancel();
}
但是一位用户写道
I don't think using Task.Yield to overcome ThreadPool starvation while
implementing producer/consumer pattern is a good idea. I suggest you
ask a separate question if you want to go into details as to why.
有人知道,为什么这不是个好主意?
您的问题的评论中还有一些优点。作为您引用的用户,我只想总结一下:使用正确的工具来完成工作。
使用 ThreadPool
感觉不是执行多个连续 CPU 绑定任务的正确工具,即使您尝试通过将它们变成状态机来组织一些协作执行,从而产生 CPU 和 await Task.Yield()
互相的时间。线程切换相当昂贵;通过在紧密循环中执行 await Task.Yield()
会增加大量开销。此外,您永远不应该接管整个 ThreadPool
,因为 .NET 框架(和底层 OS 进程)可能需要它来做其他事情。在相关说明中,TPL 甚至具有 TaskCreationOptions.LongRunning
选项,该选项请求不 运行 在 ThreadPool
线程上执行任务(相反,它会创建一个带有 new Thread()
的普通线程场景)。
也就是说,使用 custom TaskScheduler
在某些专用的池外线程上使用有限的并行性,这些线程具有线程亲和力,适用于单个 long-运行宁任务可能是另一回事。至少,await
延续将发布在同一个线程上,这应该有助于减少切换开销。这让我想起了我刚才用 ThreadAffinityTaskScheduler
.
试图解决的另一个问题
不过,根据特定情况,通常最好使用现有的行之有效且经过测试的工具。仅举几例:Parallel Class, TPL Dataflow, System.Threading.Channels, Reactive Extensions。
还有一整套现有的工业级解决方案来处理发布-订阅模式(RabbitMQ、PubNub、Redis、Azure Service Bus、Firebase Cloud Messaging (FCM)、Amazon Simple Queue Service (SQS) 等) ).
在与其他担心上下文切换及其对性能影响的用户就此问题进行了一些辩论之后。
我明白他们在担心什么。
但我的意思是:在循环中做一些事情... 是一项重要的任务 - 通常以消息处理程序的形式从队列中读取消息并进行处理它。消息处理程序通常是用户定义的,消息总线使用某种调度程序来执行它们。用户可以实现一个同步执行的处理程序(没有人知道用户会做什么),并且没有 Task.Yield 会阻塞线程以循环处理那些同步任务。
不是空话我添加了测试github:https://github.com/BBGONE/TestThreadAffinity
他们将 ThreadAffinityTaskScheduler、.NET ThreadScheduler 与 BlockingCollection 以及 .NET ThreadScheduler 与 Threading.Channels.
进行了比较
测试表明,对于 Ultra Short 作业,性能下降是
约 15%。要使用 Task.Yield 而不会降低性能(即使很小) - 不要使用极短的任务,如果任务太短,则将较短的任务组合成更大的批次。
[上下文切换的价格] = [上下文切换时长] / ([作业时长]+[上下文切换时长])。
在那种情况下,切换任务对性能的影响可以忽略不计。但它增加了更好的系统任务协作和响应能力。
对于长 运行 任务,最好使用自定义调度程序,它在自己的专用线程池上执行任务 -(如 WorkStealingTaskScheduler)。
对于混合作业 - 可以包含不同的部分 - 短 运行 CPU 绑定、异步和长 运行 代码部分。最好把任务拆分成子任务
private async Task HandleLongRunMessage(TestMessage message, CancellationToken token = default(CancellationToken))
{
// SHORT SYNCHRONOUS TASK - execute as is on the default thread (from thread pool)
CPU_TASK(message, 50);
// IO BOUND ASYNCH TASK - used as is
await Task.Delay(50);
// BUT WRAP the LONG SYNCHRONOUS TASK inside the Task
// which is scheduled on the custom thread pool
// (to save threadpool threads)
await Task.Factory.StartNew(() => {
CPU_TASK(message, 100000);
}, token, TaskCreationOptions.DenyChildAttach, _workStealingTaskScheduler);
}
回答问题:Task.Yield - real usages? 我建议使用 Task.Yield 允许池线程被其他任务重用。在这样的模式中:
CancellationTokenSource cts;
void Start()
{
cts = new CancellationTokenSource();
// run async operation
var task = Task.Run(() => SomeWork(cts.Token), cts.Token);
// wait for completion
// after the completion handle the result/ cancellation/ errors
}
async Task<int> SomeWork(CancellationToken cancellationToken)
{
int result = 0;
bool loopAgain = true;
while (loopAgain)
{
// do something ... means a substantial work or a micro batch here - not processing a single byte
loopAgain = /* check for loop end && */ cancellationToken.IsCancellationRequested;
if (loopAgain) {
// reschedule the task to the threadpool and free this thread for other waiting tasks
await Task.Yield();
}
}
cancellationToken.ThrowIfCancellationRequested();
return result;
}
void Cancel()
{
// request cancelation
cts.Cancel();
}
但是一位用户写道
I don't think using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern is a good idea. I suggest you ask a separate question if you want to go into details as to why.
有人知道,为什么这不是个好主意?
您的问题的评论中还有一些优点。作为您引用的用户,我只想总结一下:使用正确的工具来完成工作。
使用 ThreadPool
感觉不是执行多个连续 CPU 绑定任务的正确工具,即使您尝试通过将它们变成状态机来组织一些协作执行,从而产生 CPU 和 await Task.Yield()
互相的时间。线程切换相当昂贵;通过在紧密循环中执行 await Task.Yield()
会增加大量开销。此外,您永远不应该接管整个 ThreadPool
,因为 .NET 框架(和底层 OS 进程)可能需要它来做其他事情。在相关说明中,TPL 甚至具有 TaskCreationOptions.LongRunning
选项,该选项请求不 运行 在 ThreadPool
线程上执行任务(相反,它会创建一个带有 new Thread()
的普通线程场景)。
也就是说,使用 custom TaskScheduler
在某些专用的池外线程上使用有限的并行性,这些线程具有线程亲和力,适用于单个 long-运行宁任务可能是另一回事。至少,await
延续将发布在同一个线程上,这应该有助于减少切换开销。这让我想起了我刚才用 ThreadAffinityTaskScheduler
.
不过,根据特定情况,通常最好使用现有的行之有效且经过测试的工具。仅举几例:Parallel Class, TPL Dataflow, System.Threading.Channels, Reactive Extensions。
还有一整套现有的工业级解决方案来处理发布-订阅模式(RabbitMQ、PubNub、Redis、Azure Service Bus、Firebase Cloud Messaging (FCM)、Amazon Simple Queue Service (SQS) 等) ).
在与其他担心上下文切换及其对性能影响的用户就此问题进行了一些辩论之后。 我明白他们在担心什么。
但我的意思是:在循环中做一些事情... 是一项重要的任务 - 通常以消息处理程序的形式从队列中读取消息并进行处理它。消息处理程序通常是用户定义的,消息总线使用某种调度程序来执行它们。用户可以实现一个同步执行的处理程序(没有人知道用户会做什么),并且没有 Task.Yield 会阻塞线程以循环处理那些同步任务。
不是空话我添加了测试github:https://github.com/BBGONE/TestThreadAffinity 他们将 ThreadAffinityTaskScheduler、.NET ThreadScheduler 与 BlockingCollection 以及 .NET ThreadScheduler 与 Threading.Channels.
进行了比较测试表明,对于 Ultra Short 作业,性能下降是 约 15%。要使用 Task.Yield 而不会降低性能(即使很小) - 不要使用极短的任务,如果任务太短,则将较短的任务组合成更大的批次。
[上下文切换的价格] = [上下文切换时长] / ([作业时长]+[上下文切换时长])。
在那种情况下,切换任务对性能的影响可以忽略不计。但它增加了更好的系统任务协作和响应能力。
对于长 运行 任务,最好使用自定义调度程序,它在自己的专用线程池上执行任务 -(如 WorkStealingTaskScheduler)。
对于混合作业 - 可以包含不同的部分 - 短 运行 CPU 绑定、异步和长 运行 代码部分。最好把任务拆分成子任务
private async Task HandleLongRunMessage(TestMessage message, CancellationToken token = default(CancellationToken))
{
// SHORT SYNCHRONOUS TASK - execute as is on the default thread (from thread pool)
CPU_TASK(message, 50);
// IO BOUND ASYNCH TASK - used as is
await Task.Delay(50);
// BUT WRAP the LONG SYNCHRONOUS TASK inside the Task
// which is scheduled on the custom thread pool
// (to save threadpool threads)
await Task.Factory.StartNew(() => {
CPU_TASK(message, 100000);
}, token, TaskCreationOptions.DenyChildAttach, _workStealingTaskScheduler);
}