如何正确终止包含循环的图?
How to properly terminate a graph that contains a loop?
我有一个 SinkShape
图表,里面有一个循环:
source.take(10) ~> merge ~> process ~> split ~> out
merge.preferred <~ split
我在内部确保每个 Stream
元素的循环次数是有限的。
我用以下代码具体化了这张图:
val result: Future[Int] = Source.fromGraph(graph).runWith(Sink.fold(...))
现在我尝试获取此 Future
的值,但从未调用 onComplete
回调。图中的原始源应该正确终止,因为我添加了 .take(10)
语句。
我也试过删除循环,并且 Future
按预期生成
Merge
阶段(以及 MergePreferred
)可以急切或不急切完成。
换句话说,您可以告诉您的阶段在其输入的 any 完成时或在其输入的 all 完成时完成。
在你的情况下,你需要一个急切的完成,因为 2 个输入之一(即标记为首选的那个)由于环回而永远不会完成。
Merge
阶段在布尔标志中公开完成行为,默认为 false(请参阅 docs 了解更多信息)。尝试将其更改为 true。
val merge = b.add(MergePreferred(secondaryPorts = 2, eagerComplete = true))
我有一个 SinkShape
图表,里面有一个循环:
source.take(10) ~> merge ~> process ~> split ~> out
merge.preferred <~ split
我在内部确保每个 Stream
元素的循环次数是有限的。
我用以下代码具体化了这张图:
val result: Future[Int] = Source.fromGraph(graph).runWith(Sink.fold(...))
现在我尝试获取此 Future
的值,但从未调用 onComplete
回调。图中的原始源应该正确终止,因为我添加了 .take(10)
语句。
我也试过删除循环,并且 Future
按预期生成
Merge
阶段(以及 MergePreferred
)可以急切或不急切完成。
换句话说,您可以告诉您的阶段在其输入的 any 完成时或在其输入的 all 完成时完成。
在你的情况下,你需要一个急切的完成,因为 2 个输入之一(即标记为首选的那个)由于环回而永远不会完成。
Merge
阶段在布尔标志中公开完成行为,默认为 false(请参阅 docs 了解更多信息)。尝试将其更改为 true。
val merge = b.add(MergePreferred(secondaryPorts = 2, eagerComplete = true))