如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore

How to add a StateStore using StateStoreBuilder in a Spring Cloud Stream Kafka Streams application

原生 Kafka API 允许我们 create and add a state store using the StreamsBuilder:

    final StreamsBuilder builder = new StreamsBuilder();
    ...
    final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
            Stores.persistentWindowStore(storeName,
                                         retentionPeriod,
                                         windowSize,
                                         false
            ),
            Serdes.String(),
            Serdes.Long());

    builder.addStateStore(dedupStoreBuilder);

我想使用 Spring Cloud Streams 做同样的事情,但无法找到访问 StreamsBuilder 以添加商店的方法。

我已尝试检索 doc 中所述的 StreamsBuilderFactoryBean,希望我可以从中获取 StreamsBuilder 对象,但该 bean 似乎没有可用:

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val context: ApplicationContext) {

    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {

        val streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean::class.java)
        ...
        return xxx

    }

}

Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'stream-builder-process' available

无论如何,我什至不确定这样做是否正确。那么,我们如何以编程方式创建 StateStore?

由于我的 Scs 版本 (Fishtown SR3),我没有看到记录的过程,但好消息是自 Germantown 以来可以声明式地创建 State Store:

const val DEDUP_STORE = "dedup-store"

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration {

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
        return input.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)

    }

}