如何在 DSL 中使用 KeyValueStore 状态存储?

How to use a KeyValueStore state store in DSL?

KeyValueStore<String, Long> kvStore=(KeyValueStore<String, Long>) 
Stores.create("InterWindowStore1").withKeys(Serdes.String())
                .withValues(Serdes.Long())
                .persistent()
                .build().get();` 

我已经按照上面的代码创建了 statestore 并尝试插入 kvStore.put(key, value); 但它让我感到 NPE

Caused by: java.lang.NullPointerException
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.access0(CachingWindowStore.java:34)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.apply(CachingWindowStore.java:86)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:95)

正如您在评论中所述,您基本上是在进行 window 聚合:

KStream stream = ...
KTable table = stream.groupByKey().aggregate(..., TimeWindow.of(...));

由于您 KTable 流可能包含您 window 聚合的更新,因此您想要修改此流。为此,您可以使用状态转换器或值转换器:

StateStoreSupplier myState = State.create("nameOfMyState")....;

KStream result = table.toStream().transform(..., "nameOfMyState");

最后,您可以将结果写入输出主题:

result.to("output-topic");

您提供给 transformTransformer 可以通过 init() 中的给定上下文获取状态,并在 transform() 中每次使用 window输出是 generated/updated.