Spring Kafka @KafkaListener - 重试发送失败的消息并手动提交偏移量
Spring Kafka @KafkaListener - Retry sending failed messages and manually commit the offset
我正在使用 @KafkaListener 从 kafka 主题中消费,我有一个应用程序逻辑来处理不同消费者组中的多个消费者的每条记录。
现在我的问题陈述是-我必须将消费的消息发送到第三方休息端点。
如果消息发送到rest-end点失败,我不应该提交偏移量,需要根据可配置的次数重试发送消息。
在可配置的重试次数之后,如果发送消息失败。我需要记录异常的原因并提交偏移量。
我正在使用 spring kafka 2.2.0。下面是我手动确认的配置
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual-immediate
logging.level.org.springframework.kafka=debug
到目前为止,我可以使用不同消费者发布到主题中的消息。并且还能够手动确认来自不同消费者的偏移量。例如
@KafkaListener(topics = "testTopic",groupId="group-1")
public void consumerOne(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("OF PayLoad:{}", record);
logger.info("OF value:"+record.value());
logger.info("OF Offset:{}",record.offset());
ack.acknowledge();
}
@KafkaListener(topics = "testTopic",groupId="group-2")
public void consumerTwo(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("MF PayLoad_2:{}", record);
logger.info("MF value_2:"+record.value());
logger.info("MF Offset_2:{}",record.offset());
//ack.acknowledge();
}
此处,consumerTwo 未确认消息。我想根据可配置的时间重试将所有消息再次发送给消费者。
任何示例或建议都会有很大帮助。
参见。
在 2.2 中,我们添加了向 SeekToCurrentErrorHandler
添加恢复器的功能,可以执行一些操作。
我最后评论中的拉取请求显示了如果恢复者成功恢复了消息,如何提交偏移量。这将在周四的 2.2.4 版本中发布。
对于您的情况,恢复器可以调用 REST 服务并在失败时抛出异常。
我正在使用 @KafkaListener 从 kafka 主题中消费,我有一个应用程序逻辑来处理不同消费者组中的多个消费者的每条记录。
现在我的问题陈述是-我必须将消费的消息发送到第三方休息端点。 如果消息发送到rest-end点失败,我不应该提交偏移量,需要根据可配置的次数重试发送消息。 在可配置的重试次数之后,如果发送消息失败。我需要记录异常的原因并提交偏移量。
我正在使用 spring kafka 2.2.0。下面是我手动确认的配置
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual-immediate
logging.level.org.springframework.kafka=debug
到目前为止,我可以使用不同消费者发布到主题中的消息。并且还能够手动确认来自不同消费者的偏移量。例如
@KafkaListener(topics = "testTopic",groupId="group-1")
public void consumerOne(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("OF PayLoad:{}", record);
logger.info("OF value:"+record.value());
logger.info("OF Offset:{}",record.offset());
ack.acknowledge();
}
@KafkaListener(topics = "testTopic",groupId="group-2")
public void consumerTwo(ConsumerRecord<String, String> record,Acknowledgment ack) {
logger.info("MF PayLoad_2:{}", record);
logger.info("MF value_2:"+record.value());
logger.info("MF Offset_2:{}",record.offset());
//ack.acknowledge();
}
此处,consumerTwo 未确认消息。我想根据可配置的时间重试将所有消息再次发送给消费者。
任何示例或建议都会有很大帮助。
参见
在 2.2 中,我们添加了向 SeekToCurrentErrorHandler
添加恢复器的功能,可以执行一些操作。
我最后评论中的拉取请求显示了如果恢复者成功恢复了消息,如何提交偏移量。这将在周四的 2.2.4 版本中发布。
对于您的情况,恢复器可以调用 REST 服务并在失败时抛出异常。