分区队列以进行并行处理
Partition a Queue for Parallel Processing
我正在编写一个从数千个客户端获取数据的服务器。我想并行处理这些数据,但是每个客户端的数据必须按顺序顺序处理。
例如,如果我有客户 A、B 和 C,我会同时处理来自所有 3 个客户的数据,但一次只处理来自每个客户的一个项目。
我的第一个尝试是创建一种分区队列,每个分区有一个 Task
使用:ConcurrentDictionary<string, BlockingCollection<T>>
,其中 string
是客户端标识符。有了这个,我启动了一个 Task
来从每个客户端消费,并将它们保存在字典 ConcurrentDictionary<string, Task>
中。任务很简单,使用 GetConsumingEnumerable
:
return Task.Run(() =>
{
foreach (var item in list.GetConsumingEnumerable())
{
this.action(item);
}
});
这很好用,但最多只能有大约 75 个客户端 - 之后,每秒项目数性能会迅速下降,这可能是由于任务之间的争用。
有更好的方法吗?也许使用较小的固定数量的消费线程可以以某种方式轮换它们正在消费的分区?
我在 .NET Framework 本身中没有发现任何有用的东西,但我感觉可以以某种方式利用 TPL?
编辑
我 而不是 只是询问如何处理具有多个生产者和单个消费者的单个队列。另外,我 仅 对基于 TPL 的解决方案感兴趣;我只是觉得这可能有帮助。
我想我在考虑为每个客户端设置一个单独的分区时偏离了方向 - 我想到的是根据客户端 ID 的哈希码(这是一个 GUID)。
这样做的好处是让我可以控制并发级别,而且实现起来也很简单。
所以我的队列定义如下:
Dictionary<int, BlockingCollection<T>> queues;
数据添加如下:
public void Enqueue(T val)
{
var bucket = val.Id.GetHashCode() % this.maxConcurrent;
this.queues[bucket].Add(val);
}
我正在编写一个从数千个客户端获取数据的服务器。我想并行处理这些数据,但是每个客户端的数据必须按顺序顺序处理。
例如,如果我有客户 A、B 和 C,我会同时处理来自所有 3 个客户的数据,但一次只处理来自每个客户的一个项目。
我的第一个尝试是创建一种分区队列,每个分区有一个 Task
使用:ConcurrentDictionary<string, BlockingCollection<T>>
,其中 string
是客户端标识符。有了这个,我启动了一个 Task
来从每个客户端消费,并将它们保存在字典 ConcurrentDictionary<string, Task>
中。任务很简单,使用 GetConsumingEnumerable
:
return Task.Run(() =>
{
foreach (var item in list.GetConsumingEnumerable())
{
this.action(item);
}
});
这很好用,但最多只能有大约 75 个客户端 - 之后,每秒项目数性能会迅速下降,这可能是由于任务之间的争用。
有更好的方法吗?也许使用较小的固定数量的消费线程可以以某种方式轮换它们正在消费的分区?
我在 .NET Framework 本身中没有发现任何有用的东西,但我感觉可以以某种方式利用 TPL?
编辑 我 而不是 只是询问如何处理具有多个生产者和单个消费者的单个队列。另外,我 仅 对基于 TPL 的解决方案感兴趣;我只是觉得这可能有帮助。
我想我在考虑为每个客户端设置一个单独的分区时偏离了方向 - 我想到的是根据客户端 ID 的哈希码(这是一个 GUID)。
这样做的好处是让我可以控制并发级别,而且实现起来也很简单。
所以我的队列定义如下:
Dictionary<int, BlockingCollection<T>> queues;
数据添加如下:
public void Enqueue(T val)
{
var bucket = val.Id.GetHashCode() % this.maxConcurrent;
this.queues[bucket].Add(val);
}