如何在 Spring 启动时延迟加载 Kafka Consumer?
How to load Kafka Consumer lazily in Spring boot?
我想通过命令行参数提供组 ID,但是当我尝试这样做时出现以下错误。
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
这意味着在加载 kafkalistener 时需要 group_id。如果我在 consumerConfig 文件中提供了 groupId 那么它工作正常。
那么有什么办法可以让我通过命令行提供组 ID 并且 kafka 侦听器延迟加载这样我就不需要在程序启动时要求。
我的消费者配置:
@Configuration
class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Autowired
private ArgumentModel argumentModel;
private Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
logger.info("bootstrapServers : {}", bootstrapServers);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, argumentModel.getKafkaGroupId());
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@KafkaListener(... groupId = "${group.id}")
然后在命令行中传递-Dgroup.id=myGroup
。
我想通过命令行参数提供组 ID,但是当我尝试这样做时出现以下错误。
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
这意味着在加载 kafkalistener 时需要 group_id。如果我在 consumerConfig 文件中提供了 groupId 那么它工作正常。
那么有什么办法可以让我通过命令行提供组 ID 并且 kafka 侦听器延迟加载这样我就不需要在程序启动时要求。
我的消费者配置:
@Configuration
class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Autowired
private ArgumentModel argumentModel;
private Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
logger.info("bootstrapServers : {}", bootstrapServers);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, argumentModel.getKafkaGroupId());
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@KafkaListener(... groupId = "${group.id}")
然后在命令行中传递-Dgroup.id=myGroup
。