如何在 JoinBlock 之后完成一个 Block
How to complete a Block after JoinBlock
我有下面的 senario 来使用 TPL 执行数据流。 a
和b
是一些数据源连接在一起,然后通过数据流。
var a = new TransformBlock<object, int>(_ => 1);
var b = new TransformBlock<object, int>(_ => 2);
var join = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });
var transform = new TransformBlock<Tuple<int, int>, int>(uri =>
{
Console.WriteLine("Handling '{0}'...", uri);
return uri.Item1;
});
var printReversedWords = new ActionBlock<int>(ddd => Console.WriteLine(ddd));
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
a.LinkTo(join.Target1);
b.LinkTo(join.Target2);
join.LinkTo(transform);
transform.LinkTo(printReversedWords, linkOptions);
a.Post(1);
b.Post(1);
Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
printReversedWords.Completion.Wait();
完成所有这些之后,我可以在控制台中看到如下日志:
Handling '<1, 2>'...
1
这意味着 ActionBlock printReversedWords
已被处理。但是,它仍然在最后一行等待,永远不会结束。
谁能告诉我应该更新什么?
要完成管道,您需要在链中的第一个块上调用 Complete
。在您的情况下,您需要确保在 a
和 b
两个块上都调用了 Complete()
。您需要从 JoinBlock
传播完成
此外,与其阻塞调用 .Wait()
,不如让方法 async
和 await
完成通常更好。
当您配置延续时,请确保处理异常并通过错误下游块成功完成,。
//Propagate Completion
join.LinkTo(transform, linkOptions);
transform.LinkTo(printReversedWords, linkOptions);
a.Post(1);
b.Post(1);
//Complete the start of the pipeline
a.Complete();
b.Complete();
await Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
await printReversedWords.Completion;
我有下面的 senario 来使用 TPL 执行数据流。 a
和b
是一些数据源连接在一起,然后通过数据流。
var a = new TransformBlock<object, int>(_ => 1);
var b = new TransformBlock<object, int>(_ => 2);
var join = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });
var transform = new TransformBlock<Tuple<int, int>, int>(uri =>
{
Console.WriteLine("Handling '{0}'...", uri);
return uri.Item1;
});
var printReversedWords = new ActionBlock<int>(ddd => Console.WriteLine(ddd));
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
a.LinkTo(join.Target1);
b.LinkTo(join.Target2);
join.LinkTo(transform);
transform.LinkTo(printReversedWords, linkOptions);
a.Post(1);
b.Post(1);
Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
printReversedWords.Completion.Wait();
完成所有这些之后,我可以在控制台中看到如下日志:
Handling '<1, 2>'...
1
这意味着 ActionBlock printReversedWords
已被处理。但是,它仍然在最后一行等待,永远不会结束。
谁能告诉我应该更新什么?
要完成管道,您需要在链中的第一个块上调用 Complete
。在您的情况下,您需要确保在 a
和 b
两个块上都调用了 Complete()
。您需要从 JoinBlock
此外,与其阻塞调用 .Wait()
,不如让方法 async
和 await
完成通常更好。
当您配置延续时,请确保处理异常并通过错误下游块成功完成,
//Propagate Completion
join.LinkTo(transform, linkOptions);
transform.LinkTo(printReversedWords, linkOptions);
a.Post(1);
b.Post(1);
//Complete the start of the pipeline
a.Complete();
b.Complete();
await Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
await printReversedWords.Completion;