如何在 Spring 启动时延迟加载 Kafka Consumer?

How to load Kafka Consumer lazily in Spring boot?

我想通过命令行参数提供组 ID,但是当我尝试这样做时出现以下错误。

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

这意味着在加载 kafkalistener 时需要 group_id。如果我在 consumerConfig 文件中提供了 groupId 那么它工作正常。

那么有什么办法可以让我通过命令行提供组 ID 并且 kafka 侦听器延迟加载这样我就不需要在程序启动时要求。

我的消费者配置:

@Configuration
class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Autowired
    private ArgumentModel argumentModel;

    private Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        logger.info("bootstrapServers : {}", bootstrapServers);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, argumentModel.getKafkaGroupId());
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
@KafkaListener(... groupId = "${group.id}")

然后在命令行中传递-Dgroup.id=myGroup