如何正确终止包含循环的图?

How to properly terminate a graph that contains a loop?

我有一个 SinkShape 图表,里面有一个循环:

source.take(10) ~> merge ~> process ~> split ~> out
                   merge.preferred  <~ split        

我在内部确保每个 Stream 元素的循环次数是有限的。

我用以下代码具体化了这张图:

val result: Future[Int] = Source.fromGraph(graph).runWith(Sink.fold(...))

现在我尝试获取此 Future 的值,但从未调用 onComplete 回调。图中的原始源应该正确终止,因为我添加了 .take(10) 语句。

我也试过删除循环,并且 Future 按预期生成

Merge 阶段(以及 MergePreferred)可以急切或不急切完成。 换句话说,您可以告诉您的阶段在其输入的 any 完成时或在其输入的 all 完成时完成。

在你的情况下,你需要一个急切的完成,因为 2 个输入之一(即标记为首选的那个)由于环回而永远不会完成。

Merge 阶段在布尔标志中公开完成行为,默认为 false(请参阅 docs 了解更多信息)。尝试将其更改为 true。

val merge = b.add(MergePreferred(secondaryPorts = 2, eagerComplete = true))