创建新的 Jet 自定义分区程序
Creating a new Jet custom Partitioner
我的用例需要从 Kafka 主题读取消息,并按照消息发布到 Kafka 时的自然顺序处理消息。
Kafka 生产者负责发布在单个 kafka 主题分区中排序的每组消息,我需要在同一个 Vertex-Processor 中以相同的顺序处理每组消息。
上图代表了基本思想。有一些 KafkaSource-Processors 从 Kafka 读取。
还有一条边连接到一个顶点来解码kafka消息等。
我可以使用 kafka 消息键作为分区键,但我认为我最终会得到不平衡的解码处理器。
鉴于:
- 如何创建新的分区程序?我找不到任何例子来启发我。
- 在新的分区程序上,我如何识别发出消息的 KS 处理器?我想在前一个顶点进程和下一个顶点处理器之间建立一对一的关系,例如,KS#0 总是将消息发送到 Decode#0,KS#1 到 Decode#1 等等。
- 我需要一个新的分区程序吗?或者是否有一些开箱即用的功能可以实现?
您不需要为此使用分区程序。 Edge.isolated()
和 相等的本地并行性 是为此设计的:
dag.edge(between(kafkaSource, decode).isolated());
在这种情况下,源处理器的一个实例恰好与目标处理器的一个实例绑定,项目的顺序将被保留。请记住,单个 Kafka 源处理器可以从多个 Kafka 分区获取项目,因此您必须跟踪 Kafka 分区 ID。即使你让 Jet 处理器和 Kafka 分区的总数相等,你也不能依赖它,因为如果其中一个成员失败并且作业重新启动,Jet 处理器总数会减少但 Kafka 分区的数量会赢't.
另请注意,源的默认本地并行度不相等:对于 Kafka 源,它默认为 2,对于其他源,它通常等于 CPU 计数。您需要手动指定等值。
另一个限制是,如果您对 decode
顶点使用 Processors.mapP
,则映射函数必须是无状态的。因为您需要订购商品,所以我假设您要保留一些状态。为了使其正常工作,您必须使用自定义处理器:
Vertex decode = dag.newVertex("decode", MyDecodeP::new);
处理器实现:
private static class MyDecodeP extends AbstractProcessor {
private Object myStateObject;
@Override
protected boolean tryProcess(int ordinal, @Nonnull Object item) {
Object mappedItem = ...;
return tryEmit(mappedItem);
}
}
答案是为 Jet 0.5.1 编写的。
我的用例需要从 Kafka 主题读取消息,并按照消息发布到 Kafka 时的自然顺序处理消息。
Kafka 生产者负责发布在单个 kafka 主题分区中排序的每组消息,我需要在同一个 Vertex-Processor 中以相同的顺序处理每组消息。
上图代表了基本思想。有一些 KafkaSource-Processors 从 Kafka 读取。
还有一条边连接到一个顶点来解码kafka消息等。
我可以使用 kafka 消息键作为分区键,但我认为我最终会得到不平衡的解码处理器。
鉴于:
- 如何创建新的分区程序?我找不到任何例子来启发我。
- 在新的分区程序上,我如何识别发出消息的 KS 处理器?我想在前一个顶点进程和下一个顶点处理器之间建立一对一的关系,例如,KS#0 总是将消息发送到 Decode#0,KS#1 到 Decode#1 等等。
- 我需要一个新的分区程序吗?或者是否有一些开箱即用的功能可以实现?
您不需要为此使用分区程序。 Edge.isolated()
和 相等的本地并行性 是为此设计的:
dag.edge(between(kafkaSource, decode).isolated());
在这种情况下,源处理器的一个实例恰好与目标处理器的一个实例绑定,项目的顺序将被保留。请记住,单个 Kafka 源处理器可以从多个 Kafka 分区获取项目,因此您必须跟踪 Kafka 分区 ID。即使你让 Jet 处理器和 Kafka 分区的总数相等,你也不能依赖它,因为如果其中一个成员失败并且作业重新启动,Jet 处理器总数会减少但 Kafka 分区的数量会赢't.
另请注意,源的默认本地并行度不相等:对于 Kafka 源,它默认为 2,对于其他源,它通常等于 CPU 计数。您需要手动指定等值。
另一个限制是,如果您对 decode
顶点使用 Processors.mapP
,则映射函数必须是无状态的。因为您需要订购商品,所以我假设您要保留一些状态。为了使其正常工作,您必须使用自定义处理器:
Vertex decode = dag.newVertex("decode", MyDecodeP::new);
处理器实现:
private static class MyDecodeP extends AbstractProcessor {
private Object myStateObject;
@Override
protected boolean tryProcess(int ordinal, @Nonnull Object item) {
Object mappedItem = ...;
return tryEmit(mappedItem);
}
}
答案是为 Jet 0.5.1 编写的。