如何在同一主题上使用 globalKtable 和 StateStore?
how to use globalKtable and StateStore on the same topic?
澄清一下,我是 Kafka 的新手,如果我的问题似乎没有记录,我很抱歉,我正在阅读教程、文档和我能理解的一切。
我正在尝试从 GlobalStore 读取所有值以更新它的值,然后使用已经存在的 StateStore 来放置这些新的更新值。
我正在尝试这样做,因为当我这样做时:
this.stateStore.all();
我只有1/10的数据,如果我没有理解错的话,这是因为我有10个分区,而ss只读了一个(虽然我不太明白为什么)
这是我的 globalTable :
public StreamsBuilder declareTopology(StreamsBuilder builder) {
logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
getInputTopic(),
getDataTopic(),
getToEsTopic());
builder.globalTable(
getDataTopic(),
Consumed.with(Serdes.String(), fooSerdes)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
"foosktable")
.withKeySerde(Serdes.String())
.withValueSerde(fooSerdes)
.withLoggingEnabled(new HashMap<>()));
...
这是 addStateStore,我无法删除它,因为它在代码的其他地方使用过:
...
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("foosktable"),
Serdes.String(),
fooSerdes));
...
return builder;
}
所以,理论上,我想做的是删除也使用相同主题的 StateStore,并使用我的 data.process 主题之一放置我的数据,问题是这个处理器确实这个 StateStore 的其他东西,所以我不能核对它。
我在这里迷路了,任何光线都会有很大帮助。谢谢!
有点不清楚您实际尝试实现的目标。但是,一些高级解释:
A GlobalKTable
只有一个目的:在不修改主题的情况下读取数据,以允许执行 KStream-GlobalKTable
-join 或通过 "interactive queries" 查询商店。
因此,您无法真正按照您的意愿进行操作,因为无法按照您的意图将数据从全局存储复制到另一个存储。您需要复制输入主题并阅读两次:(1) as GlobalKTable
和 (2) as regular KStream
在将数据放入商店之前修改数据。对于 (2),您可以使用 transform()
.
希望这对您有所帮助。
澄清一下,我是 Kafka 的新手,如果我的问题似乎没有记录,我很抱歉,我正在阅读教程、文档和我能理解的一切。
我正在尝试从 GlobalStore 读取所有值以更新它的值,然后使用已经存在的 StateStore 来放置这些新的更新值。
我正在尝试这样做,因为当我这样做时:
this.stateStore.all();
我只有1/10的数据,如果我没有理解错的话,这是因为我有10个分区,而ss只读了一个(虽然我不太明白为什么)
这是我的 globalTable :
public StreamsBuilder declareTopology(StreamsBuilder builder) {
logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
getInputTopic(),
getDataTopic(),
getToEsTopic());
builder.globalTable(
getDataTopic(),
Consumed.with(Serdes.String(), fooSerdes)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
"foosktable")
.withKeySerde(Serdes.String())
.withValueSerde(fooSerdes)
.withLoggingEnabled(new HashMap<>()));
...
这是 addStateStore,我无法删除它,因为它在代码的其他地方使用过:
...
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("foosktable"),
Serdes.String(),
fooSerdes));
...
return builder;
}
所以,理论上,我想做的是删除也使用相同主题的 StateStore,并使用我的 data.process 主题之一放置我的数据,问题是这个处理器确实这个 StateStore 的其他东西,所以我不能核对它。
我在这里迷路了,任何光线都会有很大帮助。谢谢!
有点不清楚您实际尝试实现的目标。但是,一些高级解释:
A GlobalKTable
只有一个目的:在不修改主题的情况下读取数据,以允许执行 KStream-GlobalKTable
-join 或通过 "interactive queries" 查询商店。
因此,您无法真正按照您的意愿进行操作,因为无法按照您的意图将数据从全局存储复制到另一个存储。您需要复制输入主题并阅读两次:(1) as GlobalKTable
和 (2) as regular KStream
在将数据放入商店之前修改数据。对于 (2),您可以使用 transform()
.
希望这对您有所帮助。