如何在 JoinBlock 之后完成一个 Block

How to complete a Block after JoinBlock

我有下面的 senario 来使用 TPL 执行数据流。 ab是一些数据源连接在一起,然后通过数据流。

var a = new TransformBlock<object, int>(_ => 1);
var b = new TransformBlock<object, int>(_ => 2);

var join = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });

var transform = new TransformBlock<Tuple<int, int>, int>(uri =>
{
    Console.WriteLine("Handling '{0}'...", uri);
    return uri.Item1;
});

var printReversedWords = new ActionBlock<int>(ddd => Console.WriteLine(ddd));

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

a.LinkTo(join.Target1);
b.LinkTo(join.Target2);
join.LinkTo(transform);
transform.LinkTo(printReversedWords, linkOptions);

a.Post(1);
b.Post(1);
Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
printReversedWords.Completion.Wait();

完成所有这些之后,我可以在控制台中看到如下日志:

Handling '<1, 2>'...

1

这意味着 ActionBlock printReversedWords 已被处理。但是,它仍然在最后一行等待,永远不会结束。

谁能告诉我应该更新什么?

要完成管道,您需要在链中的第一个块上调用 Complete。在您的情况下,您需要确保在 ab 两个块上都调用了 Complete()。您需要从 JoinBlock

传播完成

此外,与其阻塞调用 .Wait(),不如让方法 asyncawait 完成通常更好。

当您配置延续时,请确保处理异常并通过错误下游块成功完成,

//Propagate Completion    
join.LinkTo(transform, linkOptions); 
transform.LinkTo(printReversedWords, linkOptions);

a.Post(1);
b.Post(1);

//Complete the start of the pipeline
a.Complete();
b.Complete();

await Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
await printReversedWords.Completion;