如何在某些任务非常昂贵的任务中负载平衡并行性?
How do I load balance parallelism in tasks wherein some tasks are very costly?
我有一个需要处理的对象列表。所以说这个列表是所有客户的列表,我需要对所有客户进行 CPU 密集计算。尽管在此计算之前和之后我需要获取数据并将其提交回数据库,因此它不仅仅是一项 CPU 任务。
所以我做的是
Parallel.ForEach(list, action);
Action 字面意思
1 Fetch customer data
2 Process calculate (time and memory intensive task)
3 Commit back customer data
该代码过去运行良好,但最近有时当 多个 具有非常 大量记录 的客户被处理时,我们会出现系统故障内存。
那么有没有办法负载平衡呢?大多数客户都被快速处理,但很少有人能把所有资源都拿走。我可以避免其中的几个 运行 吗?
我可以实现这一点的一种方法是根据大小对列表进行排序,然后尝试选择第一个和最后一个项目并自己控制并行度,但我想看看我在这里有什么选择。
既然你说你在实际完成计算之前就有了计算量的近似值,那么它大大简化了操作。那时你只需要一个同步原语,它不限制要执行的操作的数量,而是有一些总权重值,并确保所有当前运行ning操作小于指定的权重值。然后,您可以请求具有给定权重值的给定操作 运行s,在有足够的未使用权重值之前,它实际上不会 运行。
没有现成的原语可以完全做到这一点(信号量非常接近,但还不完全存在)。但是,您可以相当容易地从现有的同步原语中创建一个。
public class WeightedSemaphore
{
public WeightedSemaphore(int totalWeight)
{
currentWeight = TotalWeight = totalWeight;
}
private ManualResetEvent signal = new ManualResetEvent(false);
private int currentWeight;
public int TotalWeight { get; }
public int CurrentWeight { get { lock (signal) return currentWeight; } }
public void Wait(int weight)
{
while (true)
{
lock (signal)
{
if (currentWeight >= weight)
{
currentWeight -= weight;
return;
}
}
signal.Reset();
signal.WaitOne();
}
}
public void Release(int weight)
{
lock (signal)
{
currentWeight += weight;
signal.Set();
}
}
}
现在您可以完成每个操作,确保在执行工作之前他们等待并提供他们的 "size" 值。从那里开始,只需进行一些实验即可确定您当前系统可以支持的总重量。
请注意,这样做的副作用是您会发现越快的操作越容易获得优先级。当一些 space 被释放时,较短的操作更有可能 运行 那里有什么,这意味着它会在更昂贵的操作甚至获得之前保留 space在 运行ning 拍摄。在许多情况下,这实际上是 可取的 属性,因为当您优先考虑更快的操作而不是更多的操作时,平均响应时间实际上会 下降贵的。
我有一个需要处理的对象列表。所以说这个列表是所有客户的列表,我需要对所有客户进行 CPU 密集计算。尽管在此计算之前和之后我需要获取数据并将其提交回数据库,因此它不仅仅是一项 CPU 任务。
所以我做的是
Parallel.ForEach(list, action);
Action 字面意思
1 Fetch customer data
2 Process calculate (time and memory intensive task)
3 Commit back customer data
该代码过去运行良好,但最近有时当 多个 具有非常 大量记录 的客户被处理时,我们会出现系统故障内存。
那么有没有办法负载平衡呢?大多数客户都被快速处理,但很少有人能把所有资源都拿走。我可以避免其中的几个 运行 吗?
我可以实现这一点的一种方法是根据大小对列表进行排序,然后尝试选择第一个和最后一个项目并自己控制并行度,但我想看看我在这里有什么选择。
既然你说你在实际完成计算之前就有了计算量的近似值,那么它大大简化了操作。那时你只需要一个同步原语,它不限制要执行的操作的数量,而是有一些总权重值,并确保所有当前运行ning操作小于指定的权重值。然后,您可以请求具有给定权重值的给定操作 运行s,在有足够的未使用权重值之前,它实际上不会 运行。
没有现成的原语可以完全做到这一点(信号量非常接近,但还不完全存在)。但是,您可以相当容易地从现有的同步原语中创建一个。
public class WeightedSemaphore
{
public WeightedSemaphore(int totalWeight)
{
currentWeight = TotalWeight = totalWeight;
}
private ManualResetEvent signal = new ManualResetEvent(false);
private int currentWeight;
public int TotalWeight { get; }
public int CurrentWeight { get { lock (signal) return currentWeight; } }
public void Wait(int weight)
{
while (true)
{
lock (signal)
{
if (currentWeight >= weight)
{
currentWeight -= weight;
return;
}
}
signal.Reset();
signal.WaitOne();
}
}
public void Release(int weight)
{
lock (signal)
{
currentWeight += weight;
signal.Set();
}
}
}
现在您可以完成每个操作,确保在执行工作之前他们等待并提供他们的 "size" 值。从那里开始,只需进行一些实验即可确定您当前系统可以支持的总重量。
请注意,这样做的副作用是您会发现越快的操作越容易获得优先级。当一些 space 被释放时,较短的操作更有可能 运行 那里有什么,这意味着它会在更昂贵的操作甚至获得之前保留 space在 运行ning 拍摄。在许多情况下,这实际上是 可取的 属性,因为当您优先考虑更快的操作而不是更多的操作时,平均响应时间实际上会 下降贵的。