如何防止 CPU 的 "maxing out":同步方法异步调用多个工作程序并使用 SemaphoreSlim 进行节流?
How do I prevent "maxing out" of CPU: Synchronous method calling multiple workers asynchronously & throttling using SemaphoreSlim?
我目前正在优化现有的、非常缓慢且超时的生产应用程序。 没有重写的选项。
简而言之,它是一个 WCF 服务,当前依次调用其他 4 个 "worker" WCF 服务。 None 的工作人员服务依赖于其他服务的结果。 所以我们希望它一次调用它们(而不是顺序)。我会重申,我们没有重写它的奢侈。
优化涉及使其一次调用所有工作程序服务。这就是想到异步的地方。
我在异步编程方面的经验有限,但我已经尽可能广泛地阅读了关于我的解决方案的主题。
问题是,在测试中,它可以工作,但超出了我的 CPU。非常感谢您的帮助
以下是主要 WCF 服务中的基本代码的简化版本
// The service operation belonging to main WCF Service
public void ProcessAllPendingWork()
{
var workerTasks = new List<Task<bool>>();
foreach(var workerService in _workerServices)
{
//DoWorkAsync is the worker method with the following signature:
// Task<bool> DoWorkAsync()
var workerTask = workerService.DoWorkAsync()
workerTasks.Add(workerTask);
}
var task = Task.Run(async ()=>
{
await RunWorkerTasks(workerTasks);
});
task.Wait();
}
private async RunWorkerTasks(IEnumerable<Tast<bool>> workerTasks)
{
using(var semaphore = new SemaphoreSlim(initialCount:3))
{
foreach (var workerTask in workerTasks)
{
await semaphore.WaitAsync();
try
{
await workerTask;
}
catch (System.Exception)
{
//assume 'Log' is a predefined logging service
Log.Error(ex);
}
}
}
}
我读过的内容:
Multiple ways how to limit parallel tasks processing
How to limit the amount of concurrent async I/O operations?
Approaches for throttling asynchronous methods in C#
Constraining Concurrent Threads in C#
Limiting Number of Concurrent Threads With SemaphoresSlim
Async WCF call with ChannelFactory and CreateChannel
你没有解释你想如何限制并发调用。你想要 30 个并发工作任务 运行ning,还是你想要 30 个 WCF 调用,每个调用都有它们的所有工作任务 运行ning,或者你想要并发 WCF 调用每个都有自己的并发工作任务的限制?鉴于您说过每个 WCF 调用只有 4 个工作任务并查看您的示例代码,我假设您希望全局限制 30 个并发工作任务。
首先,正如@mjwills 暗示的那样,您需要使用 SemaphoreSlim 来限制对 workerService.DoWorkAsync()
的调用。您的代码当前启动了所有这些,并且只试图限制您等待完成的数量。我想这就是为什么你会最大化 CPU 的原因。启动的工作任务数保持无限。但是请注意,您还需要在持有信号量时等待工作任务,否则您只会限制创建任务的速度,而不是并发的数量 运行。
其次,您要为每个 WCF 请求创建一个新的 SemaphoreSlim。因此,我的第一段提出了问题。这会限制任何事情的唯一方法是,如果你有比初始计数更多的工人服务,在你的样本中是 30,但你说只有 4 个工人。要有 "global" 限制,您需要使用单例 SemaphoreSlim。
第三,您永远不会在 SemaphoreSlim 上调用 .Release()
,因此,如果您确实将其设为单例,则一旦进程启动 30 个工作线程,您的代码就会挂起。确保在 try-finally 块中执行此操作,这样如果 worker 崩溃,它仍会被释放。
下面是仓促写的示例代码:
public async Task ProcessAllPendingWork()
{
var workerTasks = new List<Task<bool>>();
foreach(var workerService in _workerServices)
{
var workerTask = RunWorker(workerService);
workerTasks.Add(workerTask);
}
await Task.WhenAll(workerTasks);
}
private async Task<bool> RunWorker(Func<bool> workerService)
{
// use singleton semaphore.
await _semaphore.WaitAsync();
try
{
return await workerService.DoWorkAsync();
}
catch (System.Exception)
{
//assume error is a predefined logging service
Log.Error(ex);
return false; // ??
}
finally
{
_semaphore.Release();
}
}
TPL(Task parallel library)提供的Task抽象是对Thread的抽象;任务在线程池中排队,然后在执行者可以管理该请求时执行。
换句话说,根据某些因素(您的流量,CPU 与 IO 绑定和部署模型)尝试在您的工作函数中执行托管任务可能根本没有任何好处(或在某些情况下)慢一点)。
也就是说,我建议您使用 Task.WaitAll(可从 .NET 4.0 获得),它使用非常高级的抽象来管理并发性;特别是这段代码可能对您有用:
- 它创建工人并等待所有工人
- 执行需要10秒(最长的Worker)
- 它会捕获并为您提供管理异常的机会
- [最后但并非最不重要] 是一个声明 api,将您的注意力集中在做什么而不是如何做上。
public class Q57572902
{
public void ProcessAllPendingWork()
{
var workers = new Action[] {Worker1, Worker2, Worker3};
try
{
Task.WaitAll(workers.Select(Task.Factory.StartNew).ToArray());
// ok
}
catch (AggregateException exceptions)
{
foreach (var ex in exceptions.InnerExceptions)
{
Log.Error(ex);
}
// ko
}
}
public void Worker1() => Thread.Sleep(FromSeconds(5)); // do something
public void Worker2() => Thread.Sleep(FromSeconds(10)); // do something
public void Worker3() => throw new NotImplementedException("error to manage"); // something wrong
}
我看评论说你们最多同时需要3个工人运行;在这种情况下,您只需从 TaskScheduler 文档中复制粘贴 LimitedConcurrencyLevelTaskScheduler
。
之后你必须创建单实例 TaskScheduler
和它的 onw TaskFactory
像这样:
public static class WorkerScheduler
{
public static readonly TaskFactory Factory;
static WorkerScheduler()
{
var scheduler = new LimitedConcurrencyLevelTaskScheduler(3);
Factory = new TaskFactory(scheduler);
}
}
之前的 ProcessAllPendingWork()
代码保持不变,除了
...workers.Select(Task.Factory.StartNew)...
变成了
...workers.Select(WorkerScheduler.Factory.StartNew)...
因为您必须使用与您的自定义 WorkerScheduler
关联的 TaskFactory
。
如果您的工作人员需要 return 响应一些数据,则需要以不同的方式管理错误和数据,如下所示:
public void ProcessAllPendingWork()
{
var workers = new Func<bool>[] {Worker1, Worker2, Worker3};
var tasks = workers.Select(WorkerScheduler.Factory.StartNew).ToArray();
bool[] results = null;
Task
.WhenAll(tasks)
.ContinueWith(x =>
{
if (x.Status == TaskStatus.Faulted)
{
foreach (var exception in x.Exception.InnerExceptions)
Log(exception);
return;
}
results = x.Result; // save data in outer scope
})
.Wait();
// continue execution
// results is now filled: if results is null, some errors occured
}
除非我遗漏了什么 - 你的示例代码 运行 是并行的所有工作人员。在调用 'workerService.DoWorkAsync()' 时,工作人员开始工作。 'RunWorkerTasks' 只等待工作任务完成。 'DoWorkAsync()' 启动异步操作,而 'await' 暂停调用方法的执行,直到等待的任务完成。
CPU 使用率高的事实很可能是由于您的 workerService 的 activity 而不是您调用它们的方式。为了验证这一点,请尝试将 workerService.DoWorkAsync()
替换为 Thread.Sleep(..)
或 Task.Delay(..)
。如果您的 CPU 使用率下降,那是工人的责任。 (取决于 workerService 的作用)一旦 运行 它们并行,CPU 消耗可能会增加,甚至可以预期。
关于如何限制并行执行的问题。请注意,以下示例并未完全使用 3 个线程,但最多使用 3 个线程。
Parallel.ForEach(
_workerServices,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
workerService => workerService.DoWorkAsync()
.ContinueWith(res =>
{
// Handle your result or possible exceptions by consulting res.
})
.Wait());
正如您之前提到的,您的代码是按顺序执行的,我假设工作人员也有一个非异步等效项。使用它们可能更容易。同步调用异步方法主要是一件麻烦事。我什至通过调用 DoWorkAsync().Wait()
遇到过死锁情况。关于 How would I run an async Task<T> method synchronously? 的讨论很多。本质上,我尽量避免它。如果那不可能,我会尝试使用 ContinueWith
这会增加复杂性,或者使用之前 SO 讨论的 AsyncHelper
。
var results = new ConcurrentDictionary<WorkerService, bool>();
Parallel.ForEach(
_workerServices,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
workerService =>
{
// Handle possible exceptions via try-catch.
results.TryAdd(workerService, workerService.DoWork());
});
// evaluate results
Parallel.ForEach
利用线程池或任务池。这意味着它将给定参数 Action<TSource> body
的每次执行分派到专用线程上。您可以使用以下代码轻松验证这一点。如果 Parallel.ForEach
已经在不同的线程上分派工作,您可以简单地同步执行 'expensive' 操作。任何异步操作都是不必要的,甚至会对 运行 时间性能产生不良影响。
Parallel.ForEach(
Enumerable.Range(1, 4),
m => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
这是我用来测试的demo工程,不依赖你的workerService
private static bool DoWork()
{
Thread.Sleep(5000);
Console.WriteLine($"done by {Thread.CurrentThread.ManagedThreadId}.");
return DateTime.Now.Millisecond % 2 == 0;
}
private static Task<bool> DoWorkAsync() => Task.Run(DoWork);
private static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
// define a thread-safe dict to store the results of the async operation
var results = new ConcurrentDictionary<int, bool>();
Parallel.ForEach(
Enumerable.Range(1, 4), // this replaces the list of workers
new ParallelOptions { MaxDegreeOfParallelism = 3 },
// m => results.TryAdd(m, DoWork()), // this is the alternative synchronous call
m => DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait());
sw.Stop();
// print results
foreach (var item in results)
{
Console.WriteLine($"{item.Key}={item.Value}");
}
Console.WriteLine(sw.Elapsed.ToString());
Console.ReadLine();
}
我目前正在优化现有的、非常缓慢且超时的生产应用程序。 没有重写的选项。
简而言之,它是一个 WCF 服务,当前依次调用其他 4 个 "worker" WCF 服务。 None 的工作人员服务依赖于其他服务的结果。 所以我们希望它一次调用它们(而不是顺序)。我会重申,我们没有重写它的奢侈。
优化涉及使其一次调用所有工作程序服务。这就是想到异步的地方。
我在异步编程方面的经验有限,但我已经尽可能广泛地阅读了关于我的解决方案的主题。
问题是,在测试中,它可以工作,但超出了我的 CPU。非常感谢您的帮助
以下是主要 WCF 服务中的基本代码的简化版本
// The service operation belonging to main WCF Service
public void ProcessAllPendingWork()
{
var workerTasks = new List<Task<bool>>();
foreach(var workerService in _workerServices)
{
//DoWorkAsync is the worker method with the following signature:
// Task<bool> DoWorkAsync()
var workerTask = workerService.DoWorkAsync()
workerTasks.Add(workerTask);
}
var task = Task.Run(async ()=>
{
await RunWorkerTasks(workerTasks);
});
task.Wait();
}
private async RunWorkerTasks(IEnumerable<Tast<bool>> workerTasks)
{
using(var semaphore = new SemaphoreSlim(initialCount:3))
{
foreach (var workerTask in workerTasks)
{
await semaphore.WaitAsync();
try
{
await workerTask;
}
catch (System.Exception)
{
//assume 'Log' is a predefined logging service
Log.Error(ex);
}
}
}
}
我读过的内容:
Multiple ways how to limit parallel tasks processing
How to limit the amount of concurrent async I/O operations?
Approaches for throttling asynchronous methods in C#
Constraining Concurrent Threads in C#
Limiting Number of Concurrent Threads With SemaphoresSlim
Async WCF call with ChannelFactory and CreateChannel
你没有解释你想如何限制并发调用。你想要 30 个并发工作任务 运行ning,还是你想要 30 个 WCF 调用,每个调用都有它们的所有工作任务 运行ning,或者你想要并发 WCF 调用每个都有自己的并发工作任务的限制?鉴于您说过每个 WCF 调用只有 4 个工作任务并查看您的示例代码,我假设您希望全局限制 30 个并发工作任务。
首先,正如@mjwills 暗示的那样,您需要使用 SemaphoreSlim 来限制对 workerService.DoWorkAsync()
的调用。您的代码当前启动了所有这些,并且只试图限制您等待完成的数量。我想这就是为什么你会最大化 CPU 的原因。启动的工作任务数保持无限。但是请注意,您还需要在持有信号量时等待工作任务,否则您只会限制创建任务的速度,而不是并发的数量 运行。
其次,您要为每个 WCF 请求创建一个新的 SemaphoreSlim。因此,我的第一段提出了问题。这会限制任何事情的唯一方法是,如果你有比初始计数更多的工人服务,在你的样本中是 30,但你说只有 4 个工人。要有 "global" 限制,您需要使用单例 SemaphoreSlim。
第三,您永远不会在 SemaphoreSlim 上调用 .Release()
,因此,如果您确实将其设为单例,则一旦进程启动 30 个工作线程,您的代码就会挂起。确保在 try-finally 块中执行此操作,这样如果 worker 崩溃,它仍会被释放。
下面是仓促写的示例代码:
public async Task ProcessAllPendingWork()
{
var workerTasks = new List<Task<bool>>();
foreach(var workerService in _workerServices)
{
var workerTask = RunWorker(workerService);
workerTasks.Add(workerTask);
}
await Task.WhenAll(workerTasks);
}
private async Task<bool> RunWorker(Func<bool> workerService)
{
// use singleton semaphore.
await _semaphore.WaitAsync();
try
{
return await workerService.DoWorkAsync();
}
catch (System.Exception)
{
//assume error is a predefined logging service
Log.Error(ex);
return false; // ??
}
finally
{
_semaphore.Release();
}
}
TPL(Task parallel library)提供的Task抽象是对Thread的抽象;任务在线程池中排队,然后在执行者可以管理该请求时执行。
换句话说,根据某些因素(您的流量,CPU 与 IO 绑定和部署模型)尝试在您的工作函数中执行托管任务可能根本没有任何好处(或在某些情况下)慢一点)。
也就是说,我建议您使用 Task.WaitAll(可从 .NET 4.0 获得),它使用非常高级的抽象来管理并发性;特别是这段代码可能对您有用:
- 它创建工人并等待所有工人
- 执行需要10秒(最长的Worker)
- 它会捕获并为您提供管理异常的机会
- [最后但并非最不重要] 是一个声明 api,将您的注意力集中在做什么而不是如何做上。
public class Q57572902
{
public void ProcessAllPendingWork()
{
var workers = new Action[] {Worker1, Worker2, Worker3};
try
{
Task.WaitAll(workers.Select(Task.Factory.StartNew).ToArray());
// ok
}
catch (AggregateException exceptions)
{
foreach (var ex in exceptions.InnerExceptions)
{
Log.Error(ex);
}
// ko
}
}
public void Worker1() => Thread.Sleep(FromSeconds(5)); // do something
public void Worker2() => Thread.Sleep(FromSeconds(10)); // do something
public void Worker3() => throw new NotImplementedException("error to manage"); // something wrong
}
我看评论说你们最多同时需要3个工人运行;在这种情况下,您只需从 TaskScheduler 文档中复制粘贴 LimitedConcurrencyLevelTaskScheduler
。
之后你必须创建单实例 TaskScheduler
和它的 onw TaskFactory
像这样:
public static class WorkerScheduler
{
public static readonly TaskFactory Factory;
static WorkerScheduler()
{
var scheduler = new LimitedConcurrencyLevelTaskScheduler(3);
Factory = new TaskFactory(scheduler);
}
}
之前的 ProcessAllPendingWork()
代码保持不变,除了
...workers.Select(Task.Factory.StartNew)...
变成了
...workers.Select(WorkerScheduler.Factory.StartNew)...
因为您必须使用与您的自定义 WorkerScheduler
关联的 TaskFactory
。
如果您的工作人员需要 return 响应一些数据,则需要以不同的方式管理错误和数据,如下所示:
public void ProcessAllPendingWork()
{
var workers = new Func<bool>[] {Worker1, Worker2, Worker3};
var tasks = workers.Select(WorkerScheduler.Factory.StartNew).ToArray();
bool[] results = null;
Task
.WhenAll(tasks)
.ContinueWith(x =>
{
if (x.Status == TaskStatus.Faulted)
{
foreach (var exception in x.Exception.InnerExceptions)
Log(exception);
return;
}
results = x.Result; // save data in outer scope
})
.Wait();
// continue execution
// results is now filled: if results is null, some errors occured
}
除非我遗漏了什么 - 你的示例代码 运行 是并行的所有工作人员。在调用 'workerService.DoWorkAsync()' 时,工作人员开始工作。 'RunWorkerTasks' 只等待工作任务完成。 'DoWorkAsync()' 启动异步操作,而 'await' 暂停调用方法的执行,直到等待的任务完成。
CPU 使用率高的事实很可能是由于您的 workerService 的 activity 而不是您调用它们的方式。为了验证这一点,请尝试将 workerService.DoWorkAsync()
替换为 Thread.Sleep(..)
或 Task.Delay(..)
。如果您的 CPU 使用率下降,那是工人的责任。 (取决于 workerService 的作用)一旦 运行 它们并行,CPU 消耗可能会增加,甚至可以预期。
关于如何限制并行执行的问题。请注意,以下示例并未完全使用 3 个线程,但最多使用 3 个线程。
Parallel.ForEach(
_workerServices,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
workerService => workerService.DoWorkAsync()
.ContinueWith(res =>
{
// Handle your result or possible exceptions by consulting res.
})
.Wait());
正如您之前提到的,您的代码是按顺序执行的,我假设工作人员也有一个非异步等效项。使用它们可能更容易。同步调用异步方法主要是一件麻烦事。我什至通过调用 DoWorkAsync().Wait()
遇到过死锁情况。关于 How would I run an async Task<T> method synchronously? 的讨论很多。本质上,我尽量避免它。如果那不可能,我会尝试使用 ContinueWith
这会增加复杂性,或者使用之前 SO 讨论的 AsyncHelper
。
var results = new ConcurrentDictionary<WorkerService, bool>();
Parallel.ForEach(
_workerServices,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
workerService =>
{
// Handle possible exceptions via try-catch.
results.TryAdd(workerService, workerService.DoWork());
});
// evaluate results
Parallel.ForEach
利用线程池或任务池。这意味着它将给定参数 Action<TSource> body
的每次执行分派到专用线程上。您可以使用以下代码轻松验证这一点。如果 Parallel.ForEach
已经在不同的线程上分派工作,您可以简单地同步执行 'expensive' 操作。任何异步操作都是不必要的,甚至会对 运行 时间性能产生不良影响。
Parallel.ForEach(
Enumerable.Range(1, 4),
m => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
这是我用来测试的demo工程,不依赖你的workerService
private static bool DoWork()
{
Thread.Sleep(5000);
Console.WriteLine($"done by {Thread.CurrentThread.ManagedThreadId}.");
return DateTime.Now.Millisecond % 2 == 0;
}
private static Task<bool> DoWorkAsync() => Task.Run(DoWork);
private static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
// define a thread-safe dict to store the results of the async operation
var results = new ConcurrentDictionary<int, bool>();
Parallel.ForEach(
Enumerable.Range(1, 4), // this replaces the list of workers
new ParallelOptions { MaxDegreeOfParallelism = 3 },
// m => results.TryAdd(m, DoWork()), // this is the alternative synchronous call
m => DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait());
sw.Stop();
// print results
foreach (var item in results)
{
Console.WriteLine($"{item.Key}={item.Value}");
}
Console.WriteLine(sw.Elapsed.ToString());
Console.ReadLine();
}