是否指定了 Kafka Streams 拓扑的处理顺序?
Is the processing order of a Kafka Streams topology specified?
我想知道是否指定了流拓扑处理消息的顺序。
示例:
// read input messages
KStream<String, String> inputMessages = builder.stream("demo_input_topic_1");
inputMessages = inputMessages.peek((k, v) -> System.out.println("TECHN. NEW MESSAGE: key: " + k + ", value: " + v));
// check if message was already processed
KTable<String, Long> alreadyProcessedMessages = inputMessages.groupByKey().count();
KStream<String, String> newMessages =
inputMessages.leftJoin(alreadyProcessedMessages, (streamValue, tableValue) -> getMessageValueOrNullIfKnownMessage(streamValue, tableValue));
KStream<String, String> filteredNewMessages =
newMessages.filter((key, val) -> val != null).peek((k, v) -> System.out.println("FUNC. NEW MESSAGE: key: " + k + ", value: " + v));
// process the message
filteredNewMessages.map((key, value) -> KeyValue.pair(key, "processed message: " + value))
.peek((k, v) -> System.out.println("PROCESSED MESSAGE: key: " + k + ", value: " + v)).to("demo_output_topic_1");
与getMessageValueOrNullIfKnownMessage(...)
:
private static String getMessageValueOrNullIfKnownMessage(String newMessageValue, Long messageCounter) {
if (messageCounter > 1) {
return null;
}
return newMessageValue;
}
因此示例中只有一个输入和一个输出主题。
在 alreadyProcessedMessages
中对输入主题进行计数(因此创建了本地状态)。此外,输入主题与计数 table alreadyProcessedMessages
连接,连接的结果是流 newMessages
(如果该流中消息的值为 null
消息计数 > 1,否则为消息的原始值)。
然后,过滤 newMessages
的消息(过滤掉 null
值)并将结果写入输出主题。
那么这个最小流的作用是:它将输入主题的所有消息写入具有新密钥(之前未处理过的密钥)的输出主题。
在流工作的测试中。但我认为这并不能保证。它之所以有效,是因为消息在加入之前首先由计数节点处理。
但是这个订单有保证吗?
据我在所有文档中看到的,无法保证此处理顺序。因此,如果有新消息到达,也可能会发生这种情况:
- 消息由 "join node" 处理。
- 消息由 "counting node" 处理。
这当然会产生不同的结果(所以在这种情况下,如果第二次出现具有相同键的消息,它仍然会与原始值连接,因为它还没有被计算在内)。
那么在某处指定了处理顺序吗?
我知道在新版本的 Kafka 中,KStream-KTable 连接是根据输入分区中消息的时间戳完成的。但这在这里没有帮助,因为拓扑使用相同的输入分区(因为它是相同的消息)。
谢谢
这只是缩小开放问题范围的部分答案:
在(Confluent's Stream Architecture overview)中说了一个"depth-first processing strategy"用来遍历拓扑。没有提到在多个路径上的相同输入可以到达的节点处进行同步。
(但是,在 1 的详细级别上,基于此排除它会很困难。)
关于DFS遍历分支的顺序,我没有找到明确的说法。然而,在此 Confluent documenation on namings within the topology 中,一些示例显示了 "operator's order in the topology"。现在可以假定此顺序。这似乎是由源代码中 DSL 运算符的顺序给出的,也是执行顺序。这将提供您所要求的保证。但是我无法通过任何其他来源证实该假设。
剩下两个问题可以通过在 PAPI 实现中找到相关的源代码来回答。
- 真的只是没有同步点的普通 DFS 遍历吗?
- DFS中的分支顺序真的是2中定义的算子顺序吗?如果不是,那是什么?
无法保证。即使在当前的实现中,使用了 List
个子节点:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L203-L206 -- 但是,不能保证子节点按照它们在DSL(因为中间有一个翻译层,可能会以不同的顺序添加节点)。此外,实施可能会随时更改。
我能想到的唯一解决方法(相当昂贵)可能 工作是,在重新分区主题中发送流端数据:
KStream<String, String> newMessages =
inputMessages.through(...) // note: as of 2.6.0 release, you could use `repartition()` instead of `through()`
.leftJoin(alreadyProcessedMessages, ...);
这样,KTable 将在执行连接之前更新,因为需要先读回记录。但是,由于您在回读记录时没有任何保证,因此在连接完成之前可能会对 table 进行多次更新,这可能会使您处于与以前类似的情况。 (此外,通过其他主题重新路由数据有点昂贵。)
使用处理器 API,您可以调用 context.forward(..., To.child(...))
来控制移动。但是,对于这种情况,您还需要实现聚合并手动加入:
KStream routing = inputMessages.transform(...);
routing.groupByKey(...);
routing.leftJoin(...);
对于这种情况,您会在 transform()
之后获得要避免的重新分区主题:
KStream routing = inputMessages.transform(...);
routing.transform(...); // implement the aggregation
routing.transform(...); // implement the join
连续 transform()
将 不会 触发自动重新分区。
我想知道是否指定了流拓扑处理消息的顺序。
示例:
// read input messages
KStream<String, String> inputMessages = builder.stream("demo_input_topic_1");
inputMessages = inputMessages.peek((k, v) -> System.out.println("TECHN. NEW MESSAGE: key: " + k + ", value: " + v));
// check if message was already processed
KTable<String, Long> alreadyProcessedMessages = inputMessages.groupByKey().count();
KStream<String, String> newMessages =
inputMessages.leftJoin(alreadyProcessedMessages, (streamValue, tableValue) -> getMessageValueOrNullIfKnownMessage(streamValue, tableValue));
KStream<String, String> filteredNewMessages =
newMessages.filter((key, val) -> val != null).peek((k, v) -> System.out.println("FUNC. NEW MESSAGE: key: " + k + ", value: " + v));
// process the message
filteredNewMessages.map((key, value) -> KeyValue.pair(key, "processed message: " + value))
.peek((k, v) -> System.out.println("PROCESSED MESSAGE: key: " + k + ", value: " + v)).to("demo_output_topic_1");
与getMessageValueOrNullIfKnownMessage(...)
:
private static String getMessageValueOrNullIfKnownMessage(String newMessageValue, Long messageCounter) {
if (messageCounter > 1) {
return null;
}
return newMessageValue;
}
因此示例中只有一个输入和一个输出主题。
在 alreadyProcessedMessages
中对输入主题进行计数(因此创建了本地状态)。此外,输入主题与计数 table alreadyProcessedMessages
连接,连接的结果是流 newMessages
(如果该流中消息的值为 null
消息计数 > 1,否则为消息的原始值)。
然后,过滤 newMessages
的消息(过滤掉 null
值)并将结果写入输出主题。
那么这个最小流的作用是:它将输入主题的所有消息写入具有新密钥(之前未处理过的密钥)的输出主题。
在流工作的测试中。但我认为这并不能保证。它之所以有效,是因为消息在加入之前首先由计数节点处理。
但是这个订单有保证吗?
据我在所有文档中看到的,无法保证此处理顺序。因此,如果有新消息到达,也可能会发生这种情况:
- 消息由 "join node" 处理。
- 消息由 "counting node" 处理。
这当然会产生不同的结果(所以在这种情况下,如果第二次出现具有相同键的消息,它仍然会与原始值连接,因为它还没有被计算在内)。
那么在某处指定了处理顺序吗?
我知道在新版本的 Kafka 中,KStream-KTable 连接是根据输入分区中消息的时间戳完成的。但这在这里没有帮助,因为拓扑使用相同的输入分区(因为它是相同的消息)。
谢谢
这只是缩小开放问题范围的部分答案:
在(Confluent's Stream Architecture overview)中说了一个"depth-first processing strategy"用来遍历拓扑。没有提到在多个路径上的相同输入可以到达的节点处进行同步。 (但是,在 1 的详细级别上,基于此排除它会很困难。)
关于DFS遍历分支的顺序,我没有找到明确的说法。然而,在此 Confluent documenation on namings within the topology 中,一些示例显示了 "operator's order in the topology"。现在可以假定此顺序。这似乎是由源代码中 DSL 运算符的顺序给出的,也是执行顺序。这将提供您所要求的保证。但是我无法通过任何其他来源证实该假设。
剩下两个问题可以通过在 PAPI 实现中找到相关的源代码来回答。
- 真的只是没有同步点的普通 DFS 遍历吗?
- DFS中的分支顺序真的是2中定义的算子顺序吗?如果不是,那是什么?
无法保证。即使在当前的实现中,使用了 List
个子节点:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L203-L206 -- 但是,不能保证子节点按照它们在DSL(因为中间有一个翻译层,可能会以不同的顺序添加节点)。此外,实施可能会随时更改。
我能想到的唯一解决方法(相当昂贵)可能 工作是,在重新分区主题中发送流端数据:
KStream<String, String> newMessages =
inputMessages.through(...) // note: as of 2.6.0 release, you could use `repartition()` instead of `through()`
.leftJoin(alreadyProcessedMessages, ...);
这样,KTable 将在执行连接之前更新,因为需要先读回记录。但是,由于您在回读记录时没有任何保证,因此在连接完成之前可能会对 table 进行多次更新,这可能会使您处于与以前类似的情况。 (此外,通过其他主题重新路由数据有点昂贵。)
使用处理器 API,您可以调用 context.forward(..., To.child(...))
来控制移动。但是,对于这种情况,您还需要实现聚合并手动加入:
KStream routing = inputMessages.transform(...);
routing.groupByKey(...);
routing.leftJoin(...);
对于这种情况,您会在 transform()
之后获得要避免的重新分区主题:
KStream routing = inputMessages.transform(...);
routing.transform(...); // implement the aggregation
routing.transform(...); // implement the join
连续 transform()
将 不会 触发自动重新分区。