等待 submitTopology 完成

Wait for submitToplogy to finish

我正在看风暴应用书。我在书中找到了下面的代码片段

LocalCluster lc = new LocalCluster()
lc.submitTopology("GitHub-commit-count-topology"), config, topology);
Utils.sleep(TEN_MINUTES)
lc.killTopology("GitHub-commit-count-topology")
lc.shutdown()

因此此代码将提交拓扑以供执行等待固定的 10 分钟,然后终止拓扑。但这很奇怪。我怎么能说。 submitTopology 等待它完成并完成。终止并关机。

就像在 Akka Streams 中一样,我们得到 Future[Done],我们只是等待那个未来完成。 (而不是固定的 10 分钟)。

您可以使用 https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/Testing.java#L376 来做到这一点。

在某些情况下不使用它的原因是它要求拓扑中的每个 spout 都实现 CompletableSpout 接口 https://github.com/apache/storm/blob/4137328b75c06771f84414c3c2113e2d1c757c08/storm-client/src/jvm/org/apache/storm/testing/CompletableSpout.java

大多数 Storm spouts 永远不会达到它们 "done" 的程度(因为它是流处理框架,而不是批处理框架),因此无法判断拓扑何时完成。例如,如果您正在消费来自 Kafka 主题的消息,生产者可能随时向该主题添加更多消息,那么消费者将如何确定它已完成消费?

CompletableSpout 的存在主要是为了简化测试,因为这样 spout 就可以说它是否完成了。我链接的 completeTopology 方法然后可以使用这个额外的功能来判断拓扑中的所有喷口是否 "done",然后可以停止拓扑。

如果您在测试中使用的 spout 没有实现 CompletableSpout(大多数 spout 没有实现),则通常无法判断拓扑何时完成。在许多情况下,您仍然可以比链接的示例做得更好,例如如果我的拓扑应该在测试中将 10 条消息写入队列,我可以在将 10 条消息写入队列后结束测试。

关于 Akka 流,我不是很熟悉它们,但是查看介绍性文档,您可以认为 CompletableSpouts 类似于有界源(例如 Source(1 to 100)),而 "normal" spout 是无限源(例如 Source.repeat(1))。