如何在 DSL 中使用 KeyValueStore 状态存储?
How to use a KeyValueStore state store in DSL?
KeyValueStore<String, Long> kvStore=(KeyValueStore<String, Long>)
Stores.create("InterWindowStore1").withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build().get();`
我已经按照上面的代码创建了 statestore 并尝试插入 kvStore.put(key, value);
但它让我感到 NPE
Caused by: java.lang.NullPointerException
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access0(CachingWindowStore.java:34)
at org.apache.kafka.streams.state.internals.CachingWindowStore.apply(CachingWindowStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:95)
正如您在评论中所述,您基本上是在进行 window 聚合:
KStream stream = ...
KTable table = stream.groupByKey().aggregate(..., TimeWindow.of(...));
由于您 KTable
流可能包含您 window 聚合的更新,因此您想要修改此流。为此,您可以使用状态转换器或值转换器:
StateStoreSupplier myState = State.create("nameOfMyState")....;
KStream result = table.toStream().transform(..., "nameOfMyState");
最后,您可以将结果写入输出主题:
result.to("output-topic");
您提供给 transform
的 Transformer
可以通过 init()
中的给定上下文获取状态,并在 transform()
中每次使用 window输出是 generated/updated.
KeyValueStore<String, Long> kvStore=(KeyValueStore<String, Long>)
Stores.create("InterWindowStore1").withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build().get();`
我已经按照上面的代码创建了 statestore 并尝试插入 kvStore.put(key, value);
但它让我感到 NPE
Caused by: java.lang.NullPointerException
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access0(CachingWindowStore.java:34)
at org.apache.kafka.streams.state.internals.CachingWindowStore.apply(CachingWindowStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:95)
正如您在评论中所述,您基本上是在进行 window 聚合:
KStream stream = ...
KTable table = stream.groupByKey().aggregate(..., TimeWindow.of(...));
由于您 KTable
流可能包含您 window 聚合的更新,因此您想要修改此流。为此,您可以使用状态转换器或值转换器:
StateStoreSupplier myState = State.create("nameOfMyState")....;
KStream result = table.toStream().transform(..., "nameOfMyState");
最后,您可以将结果写入输出主题:
result.to("output-topic");
您提供给 transform
的 Transformer
可以通过 init()
中的给定上下文获取状态,并在 transform()
中每次使用 window输出是 generated/updated.