如何在 spring boot 中配置两个 Kafka StreamsBuilderFactoryBean 实例
how to configure two instances of Kafka StreamsBuilderFactoryBean in spring boot
使用 spring-boot-2.1.3,spring-kafka-2.2.4,我想有两个流配置(例如有不同的 application.id,或连接到不同的集群等)。所以我几乎根据文档定义了第一个流配置,然后添加了第二个,具有不同的名称,以及第二个 StreamsBuilderFactoryBean(也具有不同的名称):
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId1000");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//...
return new KafkaStreamsConfiguration(props);
}
@Bean(name = "myKappConfig")
public KafkaStreamsConfiguration myKafkaAppIdConfiguration() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId9999");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//...
return new KafkaStreamsConfiguration(props);
}
@Bean(name = "myKappConfigStreamBuilder")
public StreamsBuilderFactoryBean myAppStreamBuilder(
@Qualifier("myKappConfig") KafkaStreamsConfiguration myKafkaAppIdConfiguration) {
return new StreamsBuilderFactoryBean(myKafkaAppIdConfiguration);
}
但是,当我尝试 运行 应用程序时,我得到:
Parameter 0 of method kafkaStreamsFactoryBeanConfigurer in
org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration
required a single bean, but 2 were found:
- &defaultKafkaStreamsBuilder: defined by method 'defaultKafkaStreamsBuilder' in class path resource
[org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class]
- &myKappConfigStreamBuilder: defined by method 'myAppStreamBuilder' in class path resource
[com/teramedica/kafakaex001web/KafkaConfig.class]
因为 spring-boot 自动配置中的代码:
@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean) {
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}
如果不能完全替换 KafkaStreamsAnnotationDrivenConfiguration,我该如何定义多个 StreamsBuilderFactoryBean。或者,如何更改给定流的属性?
用 @Primary
标记一个工厂 bean。
使用 spring-boot-2.1.3,spring-kafka-2.2.4,我想有两个流配置(例如有不同的 application.id,或连接到不同的集群等)。所以我几乎根据文档定义了第一个流配置,然后添加了第二个,具有不同的名称,以及第二个 StreamsBuilderFactoryBean(也具有不同的名称):
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId1000");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//...
return new KafkaStreamsConfiguration(props);
}
@Bean(name = "myKappConfig")
public KafkaStreamsConfiguration myKafkaAppIdConfiguration() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId9999");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//...
return new KafkaStreamsConfiguration(props);
}
@Bean(name = "myKappConfigStreamBuilder")
public StreamsBuilderFactoryBean myAppStreamBuilder(
@Qualifier("myKappConfig") KafkaStreamsConfiguration myKafkaAppIdConfiguration) {
return new StreamsBuilderFactoryBean(myKafkaAppIdConfiguration);
}
但是,当我尝试 运行 应用程序时,我得到:
Parameter 0 of method kafkaStreamsFactoryBeanConfigurer in org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration required a single bean, but 2 were found: - &defaultKafkaStreamsBuilder: defined by method 'defaultKafkaStreamsBuilder' in class path resource [org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class] - &myKappConfigStreamBuilder: defined by method 'myAppStreamBuilder' in class path resource [com/teramedica/kafakaex001web/KafkaConfig.class]
因为 spring-boot 自动配置中的代码:
@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean) {
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}
如果不能完全替换 KafkaStreamsAnnotationDrivenConfiguration,我该如何定义多个 StreamsBuilderFactoryBean。或者,如何更改给定流的属性?
用 @Primary
标记一个工厂 bean。