等待 submitTopology 完成
Wait for submitToplogy to finish
我正在看风暴应用书。我在书中找到了下面的代码片段
LocalCluster lc = new LocalCluster()
lc.submitTopology("GitHub-commit-count-topology"), config, topology);
Utils.sleep(TEN_MINUTES)
lc.killTopology("GitHub-commit-count-topology")
lc.shutdown()
因此此代码将提交拓扑以供执行等待固定的 10 分钟,然后终止拓扑。但这很奇怪。我怎么能说。 submitTopology 等待它完成并完成。终止并关机。
就像在 Akka Streams 中一样,我们得到 Future[Done]
,我们只是等待那个未来完成。 (而不是固定的 10 分钟)。
您可以使用 https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/Testing.java#L376 来做到这一点。
在某些情况下不使用它的原因是它要求拓扑中的每个 spout 都实现 CompletableSpout 接口 https://github.com/apache/storm/blob/4137328b75c06771f84414c3c2113e2d1c757c08/storm-client/src/jvm/org/apache/storm/testing/CompletableSpout.java。
大多数 Storm spouts 永远不会达到它们 "done" 的程度(因为它是流处理框架,而不是批处理框架),因此无法判断拓扑何时完成。例如,如果您正在消费来自 Kafka 主题的消息,生产者可能随时向该主题添加更多消息,那么消费者将如何确定它已完成消费?
CompletableSpout 的存在主要是为了简化测试,因为这样 spout 就可以说它是否完成了。我链接的 completeTopology 方法然后可以使用这个额外的功能来判断拓扑中的所有喷口是否 "done",然后可以停止拓扑。
如果您在测试中使用的 spout 没有实现 CompletableSpout(大多数 spout 没有实现),则通常无法判断拓扑何时完成。在许多情况下,您仍然可以比链接的示例做得更好,例如如果我的拓扑应该在测试中将 10 条消息写入队列,我可以在将 10 条消息写入队列后结束测试。
关于 Akka 流,我不是很熟悉它们,但是查看介绍性文档,您可以认为 CompletableSpouts 类似于有界源(例如 Source(1 to 100)
),而 "normal" spout 是无限源(例如 Source.repeat(1)
)。
我正在看风暴应用书。我在书中找到了下面的代码片段
LocalCluster lc = new LocalCluster()
lc.submitTopology("GitHub-commit-count-topology"), config, topology);
Utils.sleep(TEN_MINUTES)
lc.killTopology("GitHub-commit-count-topology")
lc.shutdown()
因此此代码将提交拓扑以供执行等待固定的 10 分钟,然后终止拓扑。但这很奇怪。我怎么能说。 submitTopology 等待它完成并完成。终止并关机。
就像在 Akka Streams 中一样,我们得到 Future[Done]
,我们只是等待那个未来完成。 (而不是固定的 10 分钟)。
您可以使用 https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/Testing.java#L376 来做到这一点。
在某些情况下不使用它的原因是它要求拓扑中的每个 spout 都实现 CompletableSpout 接口 https://github.com/apache/storm/blob/4137328b75c06771f84414c3c2113e2d1c757c08/storm-client/src/jvm/org/apache/storm/testing/CompletableSpout.java。
大多数 Storm spouts 永远不会达到它们 "done" 的程度(因为它是流处理框架,而不是批处理框架),因此无法判断拓扑何时完成。例如,如果您正在消费来自 Kafka 主题的消息,生产者可能随时向该主题添加更多消息,那么消费者将如何确定它已完成消费?
CompletableSpout 的存在主要是为了简化测试,因为这样 spout 就可以说它是否完成了。我链接的 completeTopology 方法然后可以使用这个额外的功能来判断拓扑中的所有喷口是否 "done",然后可以停止拓扑。
如果您在测试中使用的 spout 没有实现 CompletableSpout(大多数 spout 没有实现),则通常无法判断拓扑何时完成。在许多情况下,您仍然可以比链接的示例做得更好,例如如果我的拓扑应该在测试中将 10 条消息写入队列,我可以在将 10 条消息写入队列后结束测试。
关于 Akka 流,我不是很熟悉它们,但是查看介绍性文档,您可以认为 CompletableSpouts 类似于有界源(例如 Source(1 to 100)
),而 "normal" spout 是无限源(例如 Source.repeat(1)
)。