创建新的 Jet 自定义分区程序

Creating a new Jet custom Partitioner

我的用例需要从 Kafka 主题读取消息,并按照消息发布到 Kafka 时的自然顺序处理消息。

Kafka 生产者负责发布在单个 kafka 主题分区中排序的每组消息,我需要在同一个 Vertex-Processor 中以相同的顺序处理每组消息。

上图代表了基本思想。有一些 KafkaSource-Processors 从 Kafka 读取。

还有一条边连接到一个顶点来解码kafka消息等。

我可以使用 kafka 消息键作为分区键,但我认为我最终会得到不平衡的解码处理器。

鉴于:

您不需要为此使用分区程序。 Edge.isolated() 相等的本地并行性 是为此设计的:

dag.edge(between(kafkaSource, decode).isolated());

在这种情况下,源处理器的一个实例恰好与目标处理器的一个实例绑定,项目的顺序将被保留。请记住,单个 Kafka 源处理器可以从多个 Kafka 分区获取项目,因此您必须跟踪 Kafka 分区 ID。即使你让 Jet 处理器和 Kafka 分区的总数相等,你也不能依赖它,因为如果其中一个成员失败并且作业重新启动,Jet 处理器总数会减少但 Kafka 分区的数量会赢't.

另请注意,源的默认本地并行度不相等:对于 Kafka 源,它默认为 2,对于其他源,它通常等于 CPU 计数。您需要手动指定等值。

另一个限制是,如果您对 decode 顶点使用 Processors.mapP,则映射函数必须是无状态的。因为您需要订购商品,所以我假设您要保留一些状态。为了使其正常工作,您必须使用自定义处理器:

Vertex decode = dag.newVertex("decode", MyDecodeP::new);

处理器实现:

private static class MyDecodeP extends AbstractProcessor {
    private Object myStateObject;

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        Object mappedItem = ...;
        return tryEmit(mappedItem);
    }
}

答案是为 Jet 0.5.1 编写的。