如何收到有关 GlobalKTable 状态存储更新的通知?

How to be notified about updates to state store of GlobalKTable?

我只是使用 API 的 StreamsBuilder 来构建 GlobalKTable,如下所示:

Materialized<Long, Category, KeyValueStore<Bytes, byte[]>> materialized =
    Materialized.<Long, Category, KeyValueStore<Bytes, byte[]>>as(this.categoryStoreName)
        .withCachingDisabled()
        .withKeySerde(Serdes.Long())
        .withValueSerde(CATEGORY_JSON_SERDE);

return streamsBuilder.globalTable(categoryTopic, materialized);

我希望收到它的更改通知。它很少更新,如果更新我想触发缓存失效。卡夫卡这样做的方式是什么?

GlobalKTable 不支持此功能。但是,您可以使用 "global store" 并实现您的自定义 Processor,每次更新都会调用它。

在内部,GlobalKTable 使用 "global store" 并为您提供 Processor 实现。

您可以通过 StreamsBuilder#addGlobalStore() 添加全球商店。