如何收到有关 GlobalKTable 状态存储更新的通知?
How to be notified about updates to state store of GlobalKTable?
我只是使用 API 的 StreamsBuilder 来构建 GlobalKTable,如下所示:
Materialized<Long, Category, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<Long, Category, KeyValueStore<Bytes, byte[]>>as(this.categoryStoreName)
.withCachingDisabled()
.withKeySerde(Serdes.Long())
.withValueSerde(CATEGORY_JSON_SERDE);
return streamsBuilder.globalTable(categoryTopic, materialized);
我希望收到它的更改通知。它很少更新,如果更新我想触发缓存失效。卡夫卡这样做的方式是什么?
GlobalKTable 不支持此功能。但是,您可以使用 "global store" 并实现您的自定义 Processor
,每次更新都会调用它。
在内部,GlobalKTable
使用 "global store" 并为您提供 Processor
实现。
您可以通过 StreamsBuilder#addGlobalStore()
添加全球商店。
我只是使用 API 的 StreamsBuilder 来构建 GlobalKTable,如下所示:
Materialized<Long, Category, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<Long, Category, KeyValueStore<Bytes, byte[]>>as(this.categoryStoreName)
.withCachingDisabled()
.withKeySerde(Serdes.Long())
.withValueSerde(CATEGORY_JSON_SERDE);
return streamsBuilder.globalTable(categoryTopic, materialized);
我希望收到它的更改通知。它很少更新,如果更新我想触发缓存失效。卡夫卡这样做的方式是什么?
GlobalKTable 不支持此功能。但是,您可以使用 "global store" 并实现您的自定义 Processor
,每次更新都会调用它。
在内部,GlobalKTable
使用 "global store" 并为您提供 Processor
实现。
您可以通过 StreamsBuilder#addGlobalStore()
添加全球商店。