StateStoreSupplier 在 KafkaStreams 中存储序列
StateStoreSupplier to store a sequence in KafkaStreams
我需要对来自两个主题(使用外部联接合并)的数据重新排序。使用 StateStore
保持最新序列并使用重新排序的消息修改下游流值是一种很好的做法。
简化问题:
(seq from topic A, seq from topic B) -> new seq to output (保持当前序列在StateStore
)
(10,100) -> 1
(11,101) -> 2
(12,102) -> 3
(...,...) -> ...
新序列将作为键 "currentSeq" 的值存储在 stateStore 中。序列将在每条消息上递增并存储回 stateStore。
您应该使用具有已注册(可能是自定义)状态的处理器 API。
您还可以使用 process()
、transform()
或 transformValue()
将处理器 API 与 DSL 混合搭配,并引用状态存储(按名称)。
见
我需要对来自两个主题(使用外部联接合并)的数据重新排序。使用 StateStore
保持最新序列并使用重新排序的消息修改下游流值是一种很好的做法。
简化问题:
(seq from topic A, seq from topic B) -> new seq to output (保持当前序列在StateStore
)
(10,100) -> 1
(11,101) -> 2
(12,102) -> 3
(...,...) -> ...
新序列将作为键 "currentSeq" 的值存储在 stateStore 中。序列将在每条消息上递增并存储回 stateStore。
您应该使用具有已注册(可能是自定义)状态的处理器 API。
您还可以使用 process()
、transform()
或 transformValue()
将处理器 API 与 DSL 混合搭配,并引用状态存储(按名称)。
见