Storm acker的混淆与有保障的消息处理
Confusion of Storm acker and guaranteed message processing
现在正在学习Storm的Guaranteeing Message Processing,对这部分的一些概念比较迷惑。
为了保证 spout 发出的消息被完全处理,Storm 使用 acker 来实现这一点。每次 spout 发出一个元组时,acker 都会分配初始化为 0 的 "ack val" 来存储元组树的状态。每次此元组的下游螺栓发出新元组或确认 "old" 元组时,元组 ID 将与 "ack val" 异或。 acker 只需要检查 "ack val" 是否为 0 就知道元组已被完全处理。让我们看看下面的代码:
public class WordReader implements IRichSpout {
... ...
while((str = reader.readLine()) != null){
this.collector.emit(new Values(str), str);
... ...
}
上面的代码片段是 "Getting Started with Storm" 教程中字数统计程序中的一个 spout。在 emit 方法中,第二个参数 "str" 是 messageId。我对这个参数感到困惑:
1) 据我了解,无论是在 spouts 还是在 bolts 中,每次发出一个元组(即一条消息)时,Storm 应该负责为该消息分配一个 64 位的 messageId。那是对的吗?或者此处 "str" 只是此消息的人类可读别名?
2) 无论 1) 的答案是什么,这里的 "str" 在两条不同的消息中都是同一个词,因为在文本文件中应该有很多重复的词。如果这是真的,那么 Storm 是如何区分不同消息的呢?这个参数是什么意思?
3) 在一些代码片段中,我看到一些 spouts 使用以下代码在 Spout emit 方法中设置消息 Id:
public class RandomIntegerSpout extends BaseRichSpout {
private long msgId = 0;
collector.emit(new Values(..., ++msgId), msgId);
}
这更接近于我的想法:不同消息的消息 ID 应该完全不同。但是对于这段代码,另一个困惑是:private field "msgId" 跨不同的 executor 会发生什么?因为每个executor都有自己的msgId初始化为0,那么不同executor中的消息会被命名为0、1、2,以此类推。那么Storm是如何区分这些消息的呢?
我是 Storm 的新手,所以这些问题可能很幼稚。希望有人能帮我弄清楚。谢谢!
关于消息 ID 是通用的:在内部它可能是一个 64 位值,但这个 64 位值是从 Spout 中 emit()
中提供的 msgID
对象计算的散列值。所以你可以将任何对象作为消息ID(两个对象哈希到相同值的概率接近于零)。
关于使用 str
:我认为在这个例子中,str
包含一行(而不是一个词)并且文档不太可能两次包含完全相同的行(如果有没有可能有很多空行)。
关于作为消息 ID 的计数器:您的观察是绝对正确的——如果多个 spout 运行 并行,这将导致消息 ID 冲突并破坏容错。
如果你想"fix"计数器方法,每个计数器应该不同地初始化(最好,从1...#SpoutTasks
开始)。您可以为此使用任务 ID(它是唯一的,可以通过 Spout.open()
中提供的 TopologyContext
访问)。基本上,您获取所有并行 spout 任务的所有 taskID,对它们进行排序,并为每个 spout 任务分配其顺序号。此外,您需要增加 "number of parallel spouts" 而不是 1
.
现在正在学习Storm的Guaranteeing Message Processing,对这部分的一些概念比较迷惑。
为了保证 spout 发出的消息被完全处理,Storm 使用 acker 来实现这一点。每次 spout 发出一个元组时,acker 都会分配初始化为 0 的 "ack val" 来存储元组树的状态。每次此元组的下游螺栓发出新元组或确认 "old" 元组时,元组 ID 将与 "ack val" 异或。 acker 只需要检查 "ack val" 是否为 0 就知道元组已被完全处理。让我们看看下面的代码:
public class WordReader implements IRichSpout {
... ...
while((str = reader.readLine()) != null){
this.collector.emit(new Values(str), str);
... ...
}
上面的代码片段是 "Getting Started with Storm" 教程中字数统计程序中的一个 spout。在 emit 方法中,第二个参数 "str" 是 messageId。我对这个参数感到困惑: 1) 据我了解,无论是在 spouts 还是在 bolts 中,每次发出一个元组(即一条消息)时,Storm 应该负责为该消息分配一个 64 位的 messageId。那是对的吗?或者此处 "str" 只是此消息的人类可读别名? 2) 无论 1) 的答案是什么,这里的 "str" 在两条不同的消息中都是同一个词,因为在文本文件中应该有很多重复的词。如果这是真的,那么 Storm 是如何区分不同消息的呢?这个参数是什么意思? 3) 在一些代码片段中,我看到一些 spouts 使用以下代码在 Spout emit 方法中设置消息 Id:
public class RandomIntegerSpout extends BaseRichSpout {
private long msgId = 0;
collector.emit(new Values(..., ++msgId), msgId);
}
这更接近于我的想法:不同消息的消息 ID 应该完全不同。但是对于这段代码,另一个困惑是:private field "msgId" 跨不同的 executor 会发生什么?因为每个executor都有自己的msgId初始化为0,那么不同executor中的消息会被命名为0、1、2,以此类推。那么Storm是如何区分这些消息的呢?
我是 Storm 的新手,所以这些问题可能很幼稚。希望有人能帮我弄清楚。谢谢!
关于消息 ID 是通用的:在内部它可能是一个 64 位值,但这个 64 位值是从 Spout 中 emit()
中提供的 msgID
对象计算的散列值。所以你可以将任何对象作为消息ID(两个对象哈希到相同值的概率接近于零)。
关于使用 str
:我认为在这个例子中,str
包含一行(而不是一个词)并且文档不太可能两次包含完全相同的行(如果有没有可能有很多空行)。
关于作为消息 ID 的计数器:您的观察是绝对正确的——如果多个 spout 运行 并行,这将导致消息 ID 冲突并破坏容错。
如果你想"fix"计数器方法,每个计数器应该不同地初始化(最好,从1...#SpoutTasks
开始)。您可以为此使用任务 ID(它是唯一的,可以通过 Spout.open()
中提供的 TopologyContext
访问)。基本上,您获取所有并行 spout 任务的所有 taskID,对它们进行排序,并为每个 spout 任务分配其顺序号。此外,您需要增加 "number of parallel spouts" 而不是 1
.