如何在 Spring Cloud Stream Kafka Streams 应用程序中使用 StateStoreBuilder 添加 StateStore
How to add a StateStore using StateStoreBuilder in a Spring Cloud Stream Kafka Streams application
原生 Kafka API 允许我们 create and add a state store using the StreamsBuilder:
final StreamsBuilder builder = new StreamsBuilder();
...
final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName,
retentionPeriod,
windowSize,
false
),
Serdes.String(),
Serdes.Long());
builder.addStateStore(dedupStoreBuilder);
我想使用 Spring Cloud Streams 做同样的事情,但无法找到访问 StreamsBuilder
以添加商店的方法。
我已尝试检索 doc 中所述的 StreamsBuilderFactoryBean
,希望我可以从中获取 StreamsBuilder
对象,但该 bean 似乎没有可用:
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val context: ApplicationContext) {
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
val streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean::class.java)
...
return xxx
}
}
Caused by:
org.springframework.beans.factory.NoSuchBeanDefinitionException: No
bean named 'stream-builder-process' available
无论如何,我什至不确定这样做是否正确。那么,我们如何以编程方式创建 StateStore
?
由于我的 Scs 版本 (Fishtown SR3),我没有看到记录的过程,但好消息是自 Germantown 以来可以声明式地创建 State Store:
const val DEDUP_STORE = "dedup-store"
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration {
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
return input.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
}
}
原生 Kafka API 允许我们 create and add a state store using the StreamsBuilder:
final StreamsBuilder builder = new StreamsBuilder();
...
final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName,
retentionPeriod,
windowSize,
false
),
Serdes.String(),
Serdes.Long());
builder.addStateStore(dedupStoreBuilder);
我想使用 Spring Cloud Streams 做同样的事情,但无法找到访问 StreamsBuilder
以添加商店的方法。
我已尝试检索 doc 中所述的 StreamsBuilderFactoryBean
,希望我可以从中获取 StreamsBuilder
对象,但该 bean 似乎没有可用:
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val context: ApplicationContext) {
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
val streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean::class.java)
...
return xxx
}
}
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'stream-builder-process' available
无论如何,我什至不确定这样做是否正确。那么,我们如何以编程方式创建 StateStore
?
由于我的 Scs 版本 (Fishtown SR3),我没有看到记录的过程,但好消息是自 Germantown 以来可以声明式地创建 State Store:
const val DEDUP_STORE = "dedup-store"
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration {
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
return input.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
}
}