并行执行线程,但以相同顺序调用事件

Execute threads in parrallel, but calling events in the same order

我有一个场景,我有一个数据队列要处理。所有数据都被一个线程读入内存并存储到 ConcurrentQueue<T> 中,而另一个线程开始出列和处理数据。 T 是自定义项 class,需要处理大量数据。

reader 线程填充队列的一侧,而处理器线程在队列的另一侧工作。使用两个线程,这非常有效。处理数据所花费的时间 work/time 是将数据加载到内存中的大约 4 倍。所以我一直在尝试增加处理线程的数量。问题来了,处理后的数据需要按照读取的顺序保存。显然让多个线程并行处理不同的数据意味着它们不会同时完成。由于 ConcurrentQueue,线程确实按顺序读取数据,并且它们以正确的顺序将数据出队,但我还没有找到一种方法来同步线程的“保存”功能,以确保每个线程都将“保存”在他们出队的顺序相同。

我知道 .NET 包含大量线程助手,我也看过 Monitor 和 Barrier 之类的东西,但它们差异太大,我不确定哪个助手 class 或哪种方法效果最好。

有人有什么建议或想法吗?

有很多方法可以做到这一点。这是一个 TPL 数据流示例

DataFlow 有几个优点

  1. 它可以处理同步和异步工作负载。
  2. 您可以创建更大的管道。
  3. 支持任务计划程序和取消令牌。
  4. 可以运行永久,或强制完成。
  5. 每个区块可以支持多个生产者和消费者

通过

它确实有一些缺点
  1. 对于新手来说,这有点学习曲线。
  2. 它是围绕管道设计的,而不是线性集合本身,因此使用它们可能有点不直观。
  3. 创建您自己的自定义块需要深入了解 Stephen Toub 的 Twisted TPL 大脑。
  4. 它不像其他生产者消费者框架那样轻便,但它以灵活性弥补了这一点

例子

// multi threaded Consumer
var processor = new TransformBlock<Data, Data>(
   ProcessAsync,
   new ExecutionDataflowBlockOptions() 
   {
      EnsureOrdered = true, 
      MaxDegreeOfParallelism = 3

   });

var saver = new ActionBlock<Data>(
   SaveAsync, 
   new ExecutionDataflowBlockOptions()
   {
      MaxDegreeOfParallelism = 1, 
      SingleProducerConstrained = true
   });

processor.LinkTo(saver,new DataflowLinkOptions() {PropagateCompletion = true});

// Producer
for (int i = 0; i < 15; i++)
{
   Console.WriteLine($"Queueing : {i}");
   await processor.SendAsync(new Data() {Id = i});
}

processor.Complete();
await saver.Completion;

工作负载

public static Random r = new Random();
private static async Task<Data> SaveAsync(Data arg)
{
   await Task.Delay(r.Next(100, 1000));
   Console.WriteLine($"Saving : {arg.Id}");
   return arg;
}
private static async Task<Data> ProcessAsync(Data arg)
{
   await Task.Delay(r.Next(100, 1000));
   Console.WriteLine($"Processing : {arg.Id}");
   return arg;
}

输出

Queueing : 0
Queueing : 1
Queueing : 2
Queueing : 3
Queueing : 4
Queueing : 5
Queueing : 6
Queueing : 7
Queueing : 8
Queueing : 9
Queueing : 10
Queueing : 11
Queueing : 12
Queueing : 13
Queueing : 14
Processing : 2
Processing : 3
Processing : 0
Processing : 1
Processing : 4
Saving : 0
Processing : 5
Processing : 8
Saving : 1
Processing : 7
Processing : 6
Processing : 10
Saving : 2
Processing : 9
Processing : 11
Processing : 12
Processing : 13
Saving : 3
Processing : 14
Saving : 4
Saving : 5
Saving : 6
Saving : 7
Saving : 8
Saving : 9
Saving : 10
Saving : 11
Saving : 12
Saving : 13
Saving : 14

免责声明,还有许多其他方法,例如 Reactive Extensions,或者只是常规的 TPL 和循环,都有优点和缺点,有些可能比其他方法更适合您的用例。这只是数据流管道的一个基本示例。