TPL 数据流从所有传入节点(多个生产者,1 个消费者)创建聚合结果数组
TPL Dataflow create aggregated result array from all incoming nodes (multiple producer, 1 consumer)
请注意以下代码示例。我需要一个聚合器节点,它可以链接到任意数量的源,等待所有源发送一条消息,然后将它们组合到结果[]。
这应该是显而易见且直截了当的,但不知何故我没有找到解决方案。
我检查了 JoinBlock 和 TransformaterBlock,但两者似乎都不合适。
using System;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApp2
{
internal class Program
{
private static readonly uint _produceCount = 0;
private static void Main(string[] args)
{
BufferBlock<string> p1 = new BufferBlock<string>();
BufferBlock<string> p2 = new BufferBlock<string>();
// a block is required that accepts n sources as input, waits for all inputs to arrive, and then creates a result array from all inputs
ActionBlock<string[]> c1 = new ActionBlock<string[]>((inputs) =>
{
Console.WriteLine(String.Join(',', inputs));
});
p1.Post("Produce 1.1");
p2.Post("Produce 2.1");
// desired output:
// "Produce 1.1, Produce 2.1"
// actually the order is of no importance at this time
}
}
}
[编辑]
进一步澄清:
我想要一个块:
- 动态等待所有源注释(在第一条消息到达的时间点)完成并聚合结果以传递给跟随者节点
如果您事先知道您的来源,我会使用 JoinBlock
和 TransformBlock
。您必须为每个来源创建一个 BufferBlock
。
首先,JoinBlock
等待来自每个源的一条消息并将它们打包在一个元组中。然后 TransformBlock
从中间元组创建一个结果数组。
如果你事先不知道你的来源,你需要解释你希望你的新区块如何知道何时产生结果。然后应该将该逻辑放入自定义块中,可能采用 TransformManyBlock<string,string[]>
.
的形式
如果你想加入动态数量的源,你可以像这样创建一个无限制的加入块:
private static void Main()
{
var source1 = new BufferBlock<string>();
var source2 = new BufferBlock<string>();
var source3 = new BufferBlock<string>();
var aggregator = CreateAggregatorBlock( 3 );
var result = new ActionBlock<string[]>( x => Console.WriteLine( string.Join( ", ", x ) ) );
source1.LinkTo( aggregator );
source2.LinkTo( aggregator );
source3.LinkTo( aggregator );
aggregator.LinkTo( result );
source1.Post( "message 1" );
source2.Post( "message 2" );
source3.Post( "message 3" );
Console.ReadLine();
}
private static TransformManyBlock<string, string[]> CreateAggregatorBlock( int sources )
{
var buffer = new List<string>();
return new TransformManyBlock<string, string[]>( message => {
buffer.Add( message );
if( buffer.Count == sources )
{
var result = buffer.ToArray();
buffer.Clear();
return new[] {result};
}
return Enumerable.Empty<string[]>();
} );
}
这假设您的来源以相同的速率生成消息。如果不是这种情况,您需要在消息旁边标明来源的身份,并为每个来源提供一个缓冲区。
您可以为此使用非贪婪 BatchBlock
。通过非贪婪,每个来源都会为批次贡献一个项目。这是 originally suggested here。这是一个经过测试的示例:
请注意作为证明,source1
被发送了多个未显示在批次中的项目:
public class DataAggregator
{
private BatchBlock<string> batchBlock = new BatchBlock<string>(5, new GroupingDataflowBlockOptions() { Greedy = false });
private ActionBlock<string[]> writer = new ActionBlock<string[]>(strings => strings.ToList().ForEach(str => Console.WriteLine(str)));
private BufferBlock<string> source1 = new BufferBlock<string>();
private BufferBlock<string> source2 = new BufferBlock<string>();
private BufferBlock<string> source3 = new BufferBlock<string>();
private BufferBlock<string> source4 = new BufferBlock<string>();
private BufferBlock<string> source5 = new BufferBlock<string>();
public DataAggregator()
{
source1.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
source2.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
source3.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
source4.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
source5.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
batchBlock.LinkTo(writer, new DataflowLinkOptions() { PropagateCompletion = true });
}
[Test]
public async Task TestPipeline()
{
source1.Post("string1-1");
source1.Post("string1-2");
source1.Post("string1-3");
source2.Post("string2-1");
source3.Post("string3-1");
source4.Post("string4-1");
source5.Post("string5-1");
//Should print string1-1 string2-1 string3-1 string4-1 string5-1
source1.Complete();
source2.Complete();
source3.Complete();
source4.Complete();
source5.Complete();
await writer.Completion;
}
}
输出:
string1-1
string2-1
string3-1
string4-1
string5-1
请注意以下代码示例。我需要一个聚合器节点,它可以链接到任意数量的源,等待所有源发送一条消息,然后将它们组合到结果[]。
这应该是显而易见且直截了当的,但不知何故我没有找到解决方案。 我检查了 JoinBlock 和 TransformaterBlock,但两者似乎都不合适。
using System;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApp2
{
internal class Program
{
private static readonly uint _produceCount = 0;
private static void Main(string[] args)
{
BufferBlock<string> p1 = new BufferBlock<string>();
BufferBlock<string> p2 = new BufferBlock<string>();
// a block is required that accepts n sources as input, waits for all inputs to arrive, and then creates a result array from all inputs
ActionBlock<string[]> c1 = new ActionBlock<string[]>((inputs) =>
{
Console.WriteLine(String.Join(',', inputs));
});
p1.Post("Produce 1.1");
p2.Post("Produce 2.1");
// desired output:
// "Produce 1.1, Produce 2.1"
// actually the order is of no importance at this time
}
}
}
[编辑] 进一步澄清: 我想要一个块: - 动态等待所有源注释(在第一条消息到达的时间点)完成并聚合结果以传递给跟随者节点
如果您事先知道您的来源,我会使用 JoinBlock
和 TransformBlock
。您必须为每个来源创建一个 BufferBlock
。
首先,JoinBlock
等待来自每个源的一条消息并将它们打包在一个元组中。然后 TransformBlock
从中间元组创建一个结果数组。
如果你事先不知道你的来源,你需要解释你希望你的新区块如何知道何时产生结果。然后应该将该逻辑放入自定义块中,可能采用 TransformManyBlock<string,string[]>
.
如果你想加入动态数量的源,你可以像这样创建一个无限制的加入块:
private static void Main()
{
var source1 = new BufferBlock<string>();
var source2 = new BufferBlock<string>();
var source3 = new BufferBlock<string>();
var aggregator = CreateAggregatorBlock( 3 );
var result = new ActionBlock<string[]>( x => Console.WriteLine( string.Join( ", ", x ) ) );
source1.LinkTo( aggregator );
source2.LinkTo( aggregator );
source3.LinkTo( aggregator );
aggregator.LinkTo( result );
source1.Post( "message 1" );
source2.Post( "message 2" );
source3.Post( "message 3" );
Console.ReadLine();
}
private static TransformManyBlock<string, string[]> CreateAggregatorBlock( int sources )
{
var buffer = new List<string>();
return new TransformManyBlock<string, string[]>( message => {
buffer.Add( message );
if( buffer.Count == sources )
{
var result = buffer.ToArray();
buffer.Clear();
return new[] {result};
}
return Enumerable.Empty<string[]>();
} );
}
这假设您的来源以相同的速率生成消息。如果不是这种情况,您需要在消息旁边标明来源的身份,并为每个来源提供一个缓冲区。
您可以为此使用非贪婪 BatchBlock
。通过非贪婪,每个来源都会为批次贡献一个项目。这是 originally suggested here。这是一个经过测试的示例:
请注意作为证明,source1
被发送了多个未显示在批次中的项目:
public class DataAggregator
{
private BatchBlock<string> batchBlock = new BatchBlock<string>(5, new GroupingDataflowBlockOptions() { Greedy = false });
private ActionBlock<string[]> writer = new ActionBlock<string[]>(strings => strings.ToList().ForEach(str => Console.WriteLine(str)));
private BufferBlock<string> source1 = new BufferBlock<string>();
private BufferBlock<string> source2 = new BufferBlock<string>();
private BufferBlock<string> source3 = new BufferBlock<string>();
private BufferBlock<string> source4 = new BufferBlock<string>();
private BufferBlock<string> source5 = new BufferBlock<string>();
public DataAggregator()
{
source1.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
source2.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
source3.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
source4.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
source5.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
batchBlock.LinkTo(writer, new DataflowLinkOptions() { PropagateCompletion = true });
}
[Test]
public async Task TestPipeline()
{
source1.Post("string1-1");
source1.Post("string1-2");
source1.Post("string1-3");
source2.Post("string2-1");
source3.Post("string3-1");
source4.Post("string4-1");
source5.Post("string5-1");
//Should print string1-1 string2-1 string3-1 string4-1 string5-1
source1.Complete();
source2.Complete();
source3.Complete();
source4.Complete();
source5.Complete();
await writer.Completion;
}
}
输出:
string1-1
string2-1
string3-1
string4-1
string5-1