Storm KafkaSpout 如何知道所有的 bolts 都被执行了

How does Storm KafkaSpout know all bolts are executed

我的拓扑代码示例如下:

builder.setSpout("spout", new KafkaSpout);
builder.setBolt("bolt1", new Bolt1).shuffleGrouping("spout");
builder.setBolt("bolt2", new Bolt2).shuffleGrouping("bolt1");
builder.setBolt("bolt3", new Bolt3).shuffleGrouping("bolt2");

当 bolt1 发出时,消息将被自动确认。但是当bolt2或者bolt3出现异常时,这个消息是无法重发的,如何找回失败的消息?

Storm 的掌舵人是 tuple trees 的概念。让我尝试使用问题中提供的示例进行解释。

当您的 spout 调用 collector.emit 方法时,新发出的元组,我们称之为 tuple1,被添加到 tuple tree。这个元组到达 bolt1 因为它已经订阅了它并且将接收从 spout 发出的数据。一旦它在 execute 方法中接收到 tuple1 作为输入,在处理输入后,一个新值作为 tuple2 发出,它被添加到 tuple1 之后的元组树中。在退出 execute 方法之前,通过隐式调用 collector.ack 来确认元组,这告诉风暴 tuple1 已被处理,请将其从元组树中删除,现在仍然是 tuple2 这是传递给 bolt2 进行处理。

现在的问题是如果 bolt1 由于某种原因无法确认会发生什么。 Storm 会看到在一段时间后,即拓扑超时时间(默认为 30 秒),元组树还没有耗尽,因此它会从头开始重放元组,然后执行与上述相同的过程。

希望我能够解释失败时会发生什么。更多详情请阅读this or watch this