在 Kafka Streams 上写入 GlobalStateStore

Write to GlobalStateStore on Kafka Streams

我正在尝试在 Kafka DSL 上使用 addGlobalStore,其中需要存储一些值,我需要全局访问所有 threads/instances。

我的问题是我需要定期更新我的拓扑中的这些值并让所有 运行 线程知道新值。

我通过 builder.addGlobalStore 初始化了全局存储,并使用处理器的 init() 函数作为该函数的最后一个参数,但我找不到更新内部值的方法全球商店。

我的拓扑结构的下一步是一个 Transformer,我可以在其中通过全局 Store 上的 ```init()`` 获得一个钩子并读取存储的值,但不幸的是我无法在全局更新它们。我的意思是我可以更新 运行 线程的本地副本,但其他 threads/instances 看不到更改。

我在某处读到这不能在 Transformer 上完成,但即使我使用处理器,问题仍然存在

那么,有没有办法在 Kafka DSL 拓扑上更新 globalStateStore, 如果是这样,这怎么可能?或者为了使用全局存储,我需要使用低级处理器 API ?

如果它符合您的需要,您可能可以使用 GlobalKTable 而不是 GlobalStore

I initialized the global store through builder.addGlobalStore and using the init() function of a Processor that was used as the last argument on this function, but I cannot find a way to update the values inside the global store.

您不能直接更新全局商店。相反,您必须更新(= 向其写入消息)该全局存储的基础主题。