启动 Kafka 项目时出错 spring
Error while staring spring boot Kafka project
启动spring启动kafka项目时出错。
spring 启动:2.1.2.RELEASE
Spring卡夫卡版本:2.2.5.RELEASE
消费者无法配置为自动提交 ackMode MANUAL_IMMEDIATE
消费者配置
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
Kafka Listener 容器工厂
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setRetryTemplate(retryTemplate);
factory.setRecoveryCallback(context -> {
log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
ack.acknowledge();
return null;
});
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
return factory;
}
对于 MANUAL_IMMEDIATE
确认模式(对于任何手动模式,本质上),必须关闭 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
消费者 属性。
这就是那个例外的原因。
我想你可以不在 application.properties
中使用 spring.kafka.consumer.enableAutoCommit
。
启动spring启动kafka项目时出错。
spring 启动:2.1.2.RELEASE Spring卡夫卡版本:2.2.5.RELEASE
消费者无法配置为自动提交 ackMode MANUAL_IMMEDIATE
消费者配置
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
Kafka Listener 容器工厂
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setRetryTemplate(retryTemplate);
factory.setRecoveryCallback(context -> {
log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
ack.acknowledge();
return null;
});
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
return factory;
}
对于 MANUAL_IMMEDIATE
确认模式(对于任何手动模式,本质上),必须关闭 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
消费者 属性。
这就是那个例外的原因。
我想你可以不在 application.properties
中使用 spring.kafka.consumer.enableAutoCommit
。