如何在 spring boot 中配置两个 Kafka StreamsBuilderFactoryBean 实例

how to configure two instances of Kafka StreamsBuilderFactoryBean in spring boot

使用 spring-boot-2.1.3,spring-kafka-2.2.4,我想有两个流配置(例如有不同的 application.id,或连接到不同的集群等)。所以我几乎根据文档定义了第一个流配置,然后添加了第二个,具有不同的名称,以及第二个 StreamsBuilderFactoryBean(也具有不同的名称):

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId1000");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    //...
    return new KafkaStreamsConfiguration(props);
}

@Bean(name = "myKappConfig")
public KafkaStreamsConfiguration myKafkaAppIdConfiguration() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId9999");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    //...
    return new KafkaStreamsConfiguration(props);
}

@Bean(name = "myKappConfigStreamBuilder")
public StreamsBuilderFactoryBean myAppStreamBuilder(
        @Qualifier("myKappConfig") KafkaStreamsConfiguration myKafkaAppIdConfiguration) {
    return new StreamsBuilderFactoryBean(myKafkaAppIdConfiguration);
}

但是,当我尝试 运行 应用程序时,我得到:

Parameter 0 of method kafkaStreamsFactoryBeanConfigurer in org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration required a single bean, but 2 were found: - &defaultKafkaStreamsBuilder: defined by method 'defaultKafkaStreamsBuilder' in class path resource [org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class] - &myKappConfigStreamBuilder: defined by method 'myAppStreamBuilder' in class path resource [com/teramedica/kafakaex001web/KafkaConfig.class]

因为 spring-boot 自动配置中的代码:

@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
        StreamsBuilderFactoryBean factoryBean) {
    return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}

如果不能完全替换 KafkaStreamsAnnotationDrivenConfiguration,我该如何定义多个 StreamsBuilderFactoryBean。或者,如何更改给定流的属性?

@Primary 标记一个工厂 bean。