关于 Spring Kafka Listener Consumer Offset Acknowledgement 的问题
Question on Spring Kafka Listener Consumer Offset Acknowledgement
我已经创建了下面的消费者工厂。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setAutoStartup(autoStart);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
下面给出Kafka监听器
@KafkaListener(id= "${topic1}" ,
topics = "${topic1}",
groupId = "${consumer.group1}", concurrency = "1", containerFactory = "kafkaListenerContainerFactory")
public void consumeEvents1(String jsonObject, @Headers Map<String, String> header, Acknowledgment acknowledgment) {
LOG.info("Message - {}", jsonObject);
LOG.info(header.get(KafkaHeaders.GROUP_ID) + header.get(KafkaHeaders.RECEIVED_TOPIC)+String.valueOf(header.get(KafkaHeaders.OFFSET)));
acknowledgment.acknowledge();
}
在消费者工厂里,我没有设置factory.setBatchListener(true);
我的理解是上面的监听代码是每条消息调用一次,不是批量监听。这就是我看到的行为。在批处理侦听器中,我得到了一个消息列表,而不是逐条消息。
由于侦听器不是基于批处理的,因此 acknowledgment.acknowledge()
将具有与 MANUAL 或 [=34= 相同的行为]。这样理解对吗?
我参考了下面material。
使用MANUAL
,提交会排队直到处理完整个批次;这样效率更高,但增加了重新投递的可能性。
使用 MANUAL_IMMEDIATE
,提交会立即发生,只要您在侦听器线程上调用它即可。
我已经创建了下面的消费者工厂。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setAutoStartup(autoStart);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
下面给出Kafka监听器
@KafkaListener(id= "${topic1}" ,
topics = "${topic1}",
groupId = "${consumer.group1}", concurrency = "1", containerFactory = "kafkaListenerContainerFactory")
public void consumeEvents1(String jsonObject, @Headers Map<String, String> header, Acknowledgment acknowledgment) {
LOG.info("Message - {}", jsonObject);
LOG.info(header.get(KafkaHeaders.GROUP_ID) + header.get(KafkaHeaders.RECEIVED_TOPIC)+String.valueOf(header.get(KafkaHeaders.OFFSET)));
acknowledgment.acknowledge();
}
在消费者工厂里,我没有设置factory.setBatchListener(true);
我的理解是上面的监听代码是每条消息调用一次,不是批量监听。这就是我看到的行为。在批处理侦听器中,我得到了一个消息列表,而不是逐条消息。
由于侦听器不是基于批处理的,因此 acknowledgment.acknowledge()
将具有与 MANUAL 或 [=34= 相同的行为]。这样理解对吗?
我参考了下面material。
使用MANUAL
,提交会排队直到处理完整个批次;这样效率更高,但增加了重新投递的可能性。
使用 MANUAL_IMMEDIATE
,提交会立即发生,只要您在侦听器线程上调用它即可。