KTable聚合转发相同的消息
KTable aggregate forwards the same messages
我正在使用 kafka-streams 将消息聚合到 KTable 中。在我的聚合逻辑中,我总是 return 相同的累加器,如下所示:
streamOfInts
.groupByKey()
.aggregate(Accumulator.empty()) {k,v,acc -> acc}
.toStream()
.to(...)
我的期望是——因为 KTable 的值没有改变——不会向下游发送任何值。然而,这种情况并非如此。聚合函数始终转发更新。
确保产生相同(或相等)值的更新不会导致下游转发的最佳方法是什么?
DSL 运营商发出 "on update" 而不是 "on change" 的设计 atm。有一张 JIRA 票据建议添加 "emit on change" 语义 (https://issues.apache.org/jira/browse/KAFKA-8770)。
作为解决方法,您可以使用状态存储实现自定义 transform()
-- 对于每个输入记录,您检查存储是否是新的(-> 发出并放入存储)或者它是否已更改( -> 发出并更新商店)。如果它存在并且没有改变,则不要发出任何东西。
我正在使用 kafka-streams 将消息聚合到 KTable 中。在我的聚合逻辑中,我总是 return 相同的累加器,如下所示:
streamOfInts
.groupByKey()
.aggregate(Accumulator.empty()) {k,v,acc -> acc}
.toStream()
.to(...)
我的期望是——因为 KTable 的值没有改变——不会向下游发送任何值。然而,这种情况并非如此。聚合函数始终转发更新。
确保产生相同(或相等)值的更新不会导致下游转发的最佳方法是什么?
DSL 运营商发出 "on update" 而不是 "on change" 的设计 atm。有一张 JIRA 票据建议添加 "emit on change" 语义 (https://issues.apache.org/jira/browse/KAFKA-8770)。
作为解决方法,您可以使用状态存储实现自定义 transform()
-- 对于每个输入记录,您检查存储是否是新的(-> 发出并放入存储)或者它是否已更改( -> 发出并更新商店)。如果它存在并且没有改变,则不要发出任何东西。