Storm KafkaSpout 如何知道所有的 bolts 都被执行了
How does Storm KafkaSpout know all bolts are executed
我的拓扑代码示例如下:
builder.setSpout("spout", new KafkaSpout);
builder.setBolt("bolt1", new Bolt1).shuffleGrouping("spout");
builder.setBolt("bolt2", new Bolt2).shuffleGrouping("bolt1");
builder.setBolt("bolt3", new Bolt3).shuffleGrouping("bolt2");
当 bolt1 发出时,消息将被自动确认。但是当bolt2或者bolt3出现异常时,这个消息是无法重发的,如何找回失败的消息?
Storm 的掌舵人是 tuple trees
的概念。让我尝试使用问题中提供的示例进行解释。
当您的 spout
调用 collector.emit
方法时,新发出的元组,我们称之为 tuple1
,被添加到 tuple tree
。这个元组到达 bolt1
因为它已经订阅了它并且将接收从 spout
发出的数据。一旦它在 execute
方法中接收到 tuple1
作为输入,在处理输入后,一个新值作为 tuple2
发出,它被添加到 tuple1
之后的元组树中。在退出 execute
方法之前,通过隐式调用 collector.ack
来确认元组,这告诉风暴 tuple1
已被处理,请将其从元组树中删除,现在仍然是 tuple2
这是传递给 bolt2
进行处理。
现在的问题是如果 bolt1
由于某种原因无法确认会发生什么。 Storm 会看到在一段时间后,即拓扑超时时间(默认为 30 秒),元组树还没有耗尽,因此它会从头开始重放元组,然后执行与上述相同的过程。
我的拓扑代码示例如下:
builder.setSpout("spout", new KafkaSpout);
builder.setBolt("bolt1", new Bolt1).shuffleGrouping("spout");
builder.setBolt("bolt2", new Bolt2).shuffleGrouping("bolt1");
builder.setBolt("bolt3", new Bolt3).shuffleGrouping("bolt2");
当 bolt1 发出时,消息将被自动确认。但是当bolt2或者bolt3出现异常时,这个消息是无法重发的,如何找回失败的消息?
Storm 的掌舵人是 tuple trees
的概念。让我尝试使用问题中提供的示例进行解释。
当您的 spout
调用 collector.emit
方法时,新发出的元组,我们称之为 tuple1
,被添加到 tuple tree
。这个元组到达 bolt1
因为它已经订阅了它并且将接收从 spout
发出的数据。一旦它在 execute
方法中接收到 tuple1
作为输入,在处理输入后,一个新值作为 tuple2
发出,它被添加到 tuple1
之后的元组树中。在退出 execute
方法之前,通过隐式调用 collector.ack
来确认元组,这告诉风暴 tuple1
已被处理,请将其从元组树中删除,现在仍然是 tuple2
这是传递给 bolt2
进行处理。
现在的问题是如果 bolt1
由于某种原因无法确认会发生什么。 Storm 会看到在一段时间后,即拓扑超时时间(默认为 30 秒),元组树还没有耗尽,因此它会从头开始重放元组,然后执行与上述相同的过程。