是否指定了 Kafka Streams 拓扑的处理顺序?

Is the processing order of a Kafka Streams topology specified?

我想知道是否指定了流拓扑处理消息的顺序。

示例:

        // read input messages

        KStream<String, String> inputMessages = builder.stream("demo_input_topic_1");
        inputMessages = inputMessages.peek((k, v) -> System.out.println("TECHN. NEW MESSAGE: key: " + k + ", value: " + v));

        // check if message was already processed

        KTable<String, Long> alreadyProcessedMessages = inputMessages.groupByKey().count();
        KStream<String, String> newMessages =
                inputMessages.leftJoin(alreadyProcessedMessages, (streamValue, tableValue) -> getMessageValueOrNullIfKnownMessage(streamValue, tableValue));
        KStream<String, String> filteredNewMessages =
                newMessages.filter((key, val) -> val != null).peek((k, v) -> System.out.println("FUNC. NEW MESSAGE: key: " + k + ", value: " + v));

        // process the message

        filteredNewMessages.map((key, value) -> KeyValue.pair(key, "processed message: " + value))
                .peek((k, v) -> System.out.println("PROCESSED MESSAGE: key: " + k + ", value: " + v)).to("demo_output_topic_1");

getMessageValueOrNullIfKnownMessage(...):

    private static String getMessageValueOrNullIfKnownMessage(String newMessageValue, Long messageCounter) {
        if (messageCounter > 1) {
            return null;
        }

        return newMessageValue;
    }

因此示例中只有一个输入和一个输出主题。

alreadyProcessedMessages 中对输入主题进行计数(因此创建了本地状态)。此外,输入主题与计数 table alreadyProcessedMessages 连接,连接的结果是流 newMessages (如果该流中消息的值为 null消息计数 > 1,否则为消息的原始值)。

然后,过滤 newMessages 的消息(过滤掉 null 值)并将结果写入输出主题。

那么这个最小流的作用是:它将输入主题的所有消息写入具有新密钥(之前未处理过的密钥)的输出主题。

在流工作的测试中。但我认为这并不能保证。它之所以有效,是因为消息在加入之前首先由计数节点处理。

但是这个订单有保证吗?

据我在所有文档中看到的,无法保证此处理顺序。因此,如果有新消息到达,也可能会发生这种情况:

这当然会产生不同的结果(所以在这种情况下,如果第二次出现具有相同键的消息,它仍然会与原始值连接,因为它还没有被计算在内)。

那么在某处指定了处理顺序吗?

我知道在新版本的 Kafka 中,KStream-KTable 连接是根据输入分区中消息的时间戳完成的。但这在这里没有帮助,因为拓扑使用相同的输入分区(因为它是相同的消息)。

谢谢

这只是缩小开放问题范围的部分答案:

在(Confluent's Stream Architecture overview)中说了一个"depth-first processing strategy"用来遍历拓扑。没有提到在多个路径上的相同输入可以到达的节点处进行同步。 (但是,在 1 的详细级别上,基于此排除它会很困难。)

关于DFS遍历分支的顺序,我没有找到明确的说法。然而,在此 Confluent documenation on namings within the topology 中,一些示例显示了 "operator's order in the topology"。现在可以假定此顺序。这似乎是由源代码中 DSL 运算符的顺序给出的,也是执行顺序。这将提供您所要求的保证。但是我无法通过任何其他来源证实该假设。

剩下两个问题可以通过在 PAPI 实现中找到相关的源代码来回答。

  1. 真的只是没有同步点的普通 DFS 遍历吗?
  2. DFS中的分支顺序真的是2中定义的算子顺序吗?如果不是,那是什么?

无法保证。即使在当前的实现中,使用了 List 个子节点:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L203-L206 -- 但是,不能保证子节点按照它们在DSL(因为中间有一个翻译层,可能会以不同的顺序添加节点)。此外,实施可能会随时更改。

我能想到的唯一解决方法(相当昂贵)可能 工作是,在重新分区主题中发送流端数据:

KStream<String, String> newMessages =
   inputMessages.through(...) // note: as of 2.6.0 release, you could use `repartition()` instead of `through()`
                .leftJoin(alreadyProcessedMessages, ...);

这样,KTable 将在执行连接之前更新,因为需要先读回记录。但是,由于您在回读记录时没有任何保证,因此在连接完成之前可能会对 table 进行多次更新,这可能会使您处于与以前类似的情况。 (此外,通过其他主题重新路由数据有点昂贵。)

使用处理器 API,您可以调用 context.forward(..., To.child(...)) 来控制移动。但是,对于这种情况,您还需要实现聚合并手动加入:

KStream routing = inputMessages.transform(...);
routing.groupByKey(...);
routing.leftJoin(...);

对于这种情况,您会在 transform() 之后获得要避免的重新分区主题:

KStream routing = inputMessages.transform(...);
routing.transform(...); // implement the aggregation
routing.transform(...); // implement the join

连续 transform() 不会 触发自动重新分区。