StateStore 从未添加到 Spring 云端
StateStore is never added on Spring cloud
任何帮助我如何在 Spring 云上添加状态存储
我总是收到这个错误"nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore myStore is not added yet."
这是 bean 定义,但它从来没有用过
@Bean
public StoreBuilder storeBuilder() {
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("mystore");
StoreBuilder<KeyValueStore<String, MyData>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), StreamsSerde.MyDataSerde());
return storeBuilder;
}
这是Serde
public static final class MyDataSerde extends Serdes.WrapperSerde<MyData> {
public MyDataSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(MyData.class));
}
}
这里是数据class
public class MyData {
private String name;
private String course;
}
这是 spring 云依赖项
springBootVersion = "2.2.5.RELEASE"
set('springCloudVersion', "Hoxton.SR3")
implementation group:"org.springframework.cloud", name: "spring-cloud-stream"
implementation group: "org.springframework.cloud", name: "spring-cloud-stream-binder-kafka-streams"
implementation group: "org.springframework.cloud", name: "spring-cloud-starter-stream-kafka"
当您必须使用较低级别的处理器或转换器时,您需要像这样添加状态存储 API。您是否尝试将状态存储添加到 process
或 transform
方法调用中?这是一个有效的 test。查看 process
调用以及状态存储的传递方式。
我在这篇文章中找到了以编程方式添加商店的解决方案
public void initializeStateStores() throws Exception {
StreamsBuilderFactoryBean streamsBuilderFactoryBean =
applicationContext.getBean("&stream-builder-requestListener", StreamsBuilderFactoryBean.class);
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), Serdes.String(), Serdes.Long());
streamsBuilder.addStateStore(keyValueStoreBuilder);
}
任何帮助我如何在 Spring 云上添加状态存储
我总是收到这个错误"nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore myStore is not added yet."
这是 bean 定义,但它从来没有用过
@Bean
public StoreBuilder storeBuilder() {
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("mystore");
StoreBuilder<KeyValueStore<String, MyData>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), StreamsSerde.MyDataSerde());
return storeBuilder;
}
这是Serde
public static final class MyDataSerde extends Serdes.WrapperSerde<MyData> {
public MyDataSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>(MyData.class));
}
}
这里是数据class
public class MyData {
private String name;
private String course;
}
这是 spring 云依赖项
springBootVersion = "2.2.5.RELEASE"
set('springCloudVersion', "Hoxton.SR3")
implementation group:"org.springframework.cloud", name: "spring-cloud-stream"
implementation group: "org.springframework.cloud", name: "spring-cloud-stream-binder-kafka-streams"
implementation group: "org.springframework.cloud", name: "spring-cloud-starter-stream-kafka"
当您必须使用较低级别的处理器或转换器时,您需要像这样添加状态存储 API。您是否尝试将状态存储添加到 process
或 transform
方法调用中?这是一个有效的 test。查看 process
调用以及状态存储的传递方式。
我在这篇文章中找到了以编程方式添加商店的解决方案
public void initializeStateStores() throws Exception {
StreamsBuilderFactoryBean streamsBuilderFactoryBean =
applicationContext.getBean("&stream-builder-requestListener", StreamsBuilderFactoryBean.class);
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
StoreBuilder<KeyValueStore<String, Long>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), Serdes.String(), Serdes.Long());
streamsBuilder.addStateStore(keyValueStoreBuilder);
}