使用@KafkaListener 时以编程方式设置属性
Programmatically setting properties when using @KafkaListener
我在我的 spring 引导应用程序中使用 @KafkaListener
注释,但没有创建自定义 KafkaListenerContainerFactory
bean。我目前正在我的 application.yml 文件中设置 spring.kafka.consumer.value-deserializer
属性 来分配我的反序列化器,并且更喜欢以编程方式执行此操作以进行编译时检查。我意识到创建我自己的 KafkaListenerContainerFactory
允许我在工厂设置这个 属性,但我想避免样板和一些额外的复杂性来获得 SSL 设置。
有没有一种简单的方法可以在不创建我自己的情况下以编程方式设置我的值反序列化器 KafkaListenerContainerFactory
?
反序列化器继续用于创建消费者工厂的属性,而不是容器工厂,您可以按如下方式覆盖启动的消费者工厂:
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
根据docs
您可以像这样设置其他属性:
@KafkaListener(topics = "test-transactional",
properties={"foo:bar","isolation.level:read_committed"})
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info("got consumer record" + cr.toString());
}
@KafkaListener(topics = "test-transactional",
properties={"isolation.level:read_uncommitted"})
public void listenDifferent(ConsumerRecord<?, ?> cr) throws Exception {
logger.info("uncomitted: got consumer record" + cr.toString());
}
我在我的 spring 引导应用程序中使用 @KafkaListener
注释,但没有创建自定义 KafkaListenerContainerFactory
bean。我目前正在我的 application.yml 文件中设置 spring.kafka.consumer.value-deserializer
属性 来分配我的反序列化器,并且更喜欢以编程方式执行此操作以进行编译时检查。我意识到创建我自己的 KafkaListenerContainerFactory
允许我在工厂设置这个 属性,但我想避免样板和一些额外的复杂性来获得 SSL 设置。
有没有一种简单的方法可以在不创建我自己的情况下以编程方式设置我的值反序列化器 KafkaListenerContainerFactory
?
反序列化器继续用于创建消费者工厂的属性,而不是容器工厂,您可以按如下方式覆盖启动的消费者工厂:
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
根据docs
您可以像这样设置其他属性:
@KafkaListener(topics = "test-transactional",
properties={"foo:bar","isolation.level:read_committed"})
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info("got consumer record" + cr.toString());
}
@KafkaListener(topics = "test-transactional",
properties={"isolation.level:read_uncommitted"})
public void listenDifferent(ConsumerRecord<?, ?> cr) throws Exception {
logger.info("uncomitted: got consumer record" + cr.toString());
}