指示 RabbitMQ 定期重新发送未送达的消息
Instruct RabbitMQ to resend undelivered messages periodically
背景
我们正在使用 langohr 与 RabbitMQ 交互。我们尝试了两种不同的方法来让 RabbitMQ 重新发送我们的服务尚未正确处理的消息。一种可行的方法是发送 basic.nack
并将 requeue
设置为 true
,但这将立即重新发送消息,直到服务响应 basic.ack
。例如,如果服务试图将消息保存到当前已关闭(并且已关闭一段时间)的数据存储区,这会有点问题。对我们来说,每隔 20 秒左右获取一次未传递的消息会更好(即如果数据存储已关闭,我们既不会执行 basic.ack
也不会执行 basic.nack
,我们只是让消息保留在队列)。我们尝试使用 ExecutorService
来实现它,其要点是这样实现的:
(let [chan (lch/open conn)] ; We create a new channel since channels in Langohr are not thread-safe
(log/info "Triggering \"recover\" for channel" chan)
(try
(lb/recover chan)
(catch Exception e (log/error "Failed to call recover" e))
(finally (lch/close chan))))
不幸的是,这似乎不起作用(消息没有重新传递,只是保留在队列中)。如果我们重新启动服务,排队的消息将被正确使用。然而,我们还有其他使用 spring-rabbitmq (in Java) and they seem to be taking care of this out of the box. I've tried looking in the source code 实现的服务来弄清楚它们是如何做到的,但我还没有设法做到。
问题
您如何指示 RabbitMQ 定期(重新)传送队列中的消息(最好使用 Langohr)?
我不确定您在使用 Spring AMQP 应用程序做什么,但是 RabbitMQ 中没有为此内置任何内容。
但是,使用 TTL 设置 dead-lettering 在一段时间后重新排队回到原始队列非常容易。有关示例、链接等,请参阅 。
编辑
然而,Spring AMQP 有 retry interceptor which can be configured to suspend the consumer thread for some period(s) during retry.
状态重试拒绝并重新排队;无状态重试在内部处理重试,并且在重试期间不与代理交互。
请参阅 ,其中有说明:我们 Nack 消息,nack 将消息放入保持队列 N 秒,然后它 TTL 离开该队列并进入另一个队列,再将其放回原始队列。
设置需要一些工作,但效果很好!
背景
我们正在使用 langohr 与 RabbitMQ 交互。我们尝试了两种不同的方法来让 RabbitMQ 重新发送我们的服务尚未正确处理的消息。一种可行的方法是发送 basic.nack
并将 requeue
设置为 true
,但这将立即重新发送消息,直到服务响应 basic.ack
。例如,如果服务试图将消息保存到当前已关闭(并且已关闭一段时间)的数据存储区,这会有点问题。对我们来说,每隔 20 秒左右获取一次未传递的消息会更好(即如果数据存储已关闭,我们既不会执行 basic.ack
也不会执行 basic.nack
,我们只是让消息保留在队列)。我们尝试使用 ExecutorService
来实现它,其要点是这样实现的:
(let [chan (lch/open conn)] ; We create a new channel since channels in Langohr are not thread-safe
(log/info "Triggering \"recover\" for channel" chan)
(try
(lb/recover chan)
(catch Exception e (log/error "Failed to call recover" e))
(finally (lch/close chan))))
不幸的是,这似乎不起作用(消息没有重新传递,只是保留在队列中)。如果我们重新启动服务,排队的消息将被正确使用。然而,我们还有其他使用 spring-rabbitmq (in Java) and they seem to be taking care of this out of the box. I've tried looking in the source code 实现的服务来弄清楚它们是如何做到的,但我还没有设法做到。
问题
您如何指示 RabbitMQ 定期(重新)传送队列中的消息(最好使用 Langohr)?
我不确定您在使用 Spring AMQP 应用程序做什么,但是 RabbitMQ 中没有为此内置任何内容。
但是,使用 TTL 设置 dead-lettering 在一段时间后重新排队回到原始队列非常容易。有关示例、链接等,请参阅
编辑
然而,Spring AMQP 有 retry interceptor which can be configured to suspend the consumer thread for some period(s) during retry.
状态重试拒绝并重新排队;无状态重试在内部处理重试,并且在重试期间不与代理交互。
请参阅
设置需要一些工作,但效果很好!