Spring Kafka 在侦听器中按条件丢弃消息
Spring Kafka discard message by condition in listener
在我的 Spring Boot/Kafka 项目中,我有以下侦听器:
@KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {
// do some logic
ack.acknowledge();
}
在侦听器内部,我需要根据我的业务逻辑检查一些条件,如果不满足 - 跳过对该特定消息的处理并让 Kafka 知道再次重新传递该消息。
我需要这个的原因 - 根据我的应用程序的业务逻辑,我需要避免每秒向特定的 Telegram 聊天发送超过一个 post。这就是为什么我想检查 Kafka 侦听器中的 chatLastSent 时间和 postpone 消息发送(如果需要)(通过消息重新传递到此 Kafka 主题)
如何正确操作?这次我只需要不执行 ack.acknowledge();
还是有另一种更合适的方法来实现它?
您可以使用 RecordFilterStrategy。
当您抛出异常时,容器将调用错误处理程序,该处理程序将重新查找未处理的消息,以便在下一次轮询时再次获取它们。
在我的 Spring Boot/Kafka 项目中,我有以下侦听器:
@KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {
// do some logic
ack.acknowledge();
}
在侦听器内部,我需要根据我的业务逻辑检查一些条件,如果不满足 - 跳过对该特定消息的处理并让 Kafka 知道再次重新传递该消息。
我需要这个的原因 - 根据我的应用程序的业务逻辑,我需要避免每秒向特定的 Telegram 聊天发送超过一个 post。这就是为什么我想检查 Kafka 侦听器中的 chatLastSent 时间和 postpone 消息发送(如果需要)(通过消息重新传递到此 Kafka 主题)
如何正确操作?这次我只需要不执行 ack.acknowledge();
还是有另一种更合适的方法来实现它?
您可以使用 RecordFilterStrategy。
当您抛出异常时,容器将调用错误处理程序,该处理程序将重新查找未处理的消息,以便在下一次轮询时再次获取它们。