如何知道消息是否已被确认/取消?
How to know if a message has been ack-ed / nack-ed?
我正在尝试使用 RabbitMQ 和 Spring Boot 来了解消息何时被接受 (ack) 或未被接受 (nack)。
我想将消息发送到队列(通过交换)并检查队列是否已接受该消息。实际上我想发送到两个不同的队列,但这并不重要,我假设它是否适用于其中一个队列也适用于另一个队列。
所以我使用 CorrelationData
尝试过这样的事情:
public boolean sendMessage(...) {
CorrelationData cd = new CorrelationData();
this.rabbitTemplate.convertAndSend(exchange, routingKey, message, cd);
try {
return cd.getFuture().get(3, TimeUnit.SECONDS).isAck();
} catch (InterruptedException | ExecutionException | TimeoutException e ) {
e.printStackTrace();
return false;
}
}
行 cd.getFuture().get(3, TimeUnit.SECONDS).isAck()
应该得到 false
是价值没有被 ack
进入我认为的队列。但这总是正确的,即使 routingKey
不存在。
所以我假设这段代码正在检查消息是否已发送到 exchange
并且 exchange
说“是的,我已经收到消息,它还没有被路由,但我已经收到了。
所以,我已经在 Rabbit/Spring 文档中寻找其他方法,但我找不到方法。
并且,再多解释一下,我想要的是:
进入Spring开机代码我收到一个消息。此消息必须发送给其他 queues/exchange,但不能从当前队列中删除(即 acked),直到其他两个队列确认 ack
。
我有手动确认和一些伪代码我有这个:
@RabbitListener(queues = {queue})
public void receiveMessageFromDirect(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag){
boolean sendQueue1 = sendMessage(...);
boolean sendQueue2 = sendMessage(...);
if(sendQueue1 && sendQueue2){
//both messages has been readed; now I can ack this message
channel.basicAck(tag, false);
}else{
//nacked; I can't remove the message util both queue ack the message
channel.basicNack(tag,false,true);
}
我已经测试了这个结构,即使队列不存在,值 sendQueue1
和 sendQueue2
总是正确的。
确认为真;即使对于无法路由的消息(我不完全确定为什么)。
您需要启用返回的消息(并在未来完成后检查它在 CorrelationData
中是否为 null - correlationData.getReturnedMessage()
)。如果它不为空,则消息无法路由到任何队列。
如果代理中存在错误,或者如果您使用的队列具有 x-max-length
和溢出行为 reject-publish
。
,您只会得到 nacks
我正在尝试使用 RabbitMQ 和 Spring Boot 来了解消息何时被接受 (ack) 或未被接受 (nack)。
我想将消息发送到队列(通过交换)并检查队列是否已接受该消息。实际上我想发送到两个不同的队列,但这并不重要,我假设它是否适用于其中一个队列也适用于另一个队列。
所以我使用 CorrelationData
尝试过这样的事情:
public boolean sendMessage(...) {
CorrelationData cd = new CorrelationData();
this.rabbitTemplate.convertAndSend(exchange, routingKey, message, cd);
try {
return cd.getFuture().get(3, TimeUnit.SECONDS).isAck();
} catch (InterruptedException | ExecutionException | TimeoutException e ) {
e.printStackTrace();
return false;
}
}
行 cd.getFuture().get(3, TimeUnit.SECONDS).isAck()
应该得到 false
是价值没有被 ack
进入我认为的队列。但这总是正确的,即使 routingKey
不存在。
所以我假设这段代码正在检查消息是否已发送到 exchange
并且 exchange
说“是的,我已经收到消息,它还没有被路由,但我已经收到了。
所以,我已经在 Rabbit/Spring 文档中寻找其他方法,但我找不到方法。
并且,再多解释一下,我想要的是:
进入Spring开机代码我收到一个消息。此消息必须发送给其他 queues/exchange,但不能从当前队列中删除(即 acked),直到其他两个队列确认 ack
。
我有手动确认和一些伪代码我有这个:
@RabbitListener(queues = {queue})
public void receiveMessageFromDirect(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag){
boolean sendQueue1 = sendMessage(...);
boolean sendQueue2 = sendMessage(...);
if(sendQueue1 && sendQueue2){
//both messages has been readed; now I can ack this message
channel.basicAck(tag, false);
}else{
//nacked; I can't remove the message util both queue ack the message
channel.basicNack(tag,false,true);
}
我已经测试了这个结构,即使队列不存在,值 sendQueue1
和 sendQueue2
总是正确的。
确认为真;即使对于无法路由的消息(我不完全确定为什么)。
您需要启用返回的消息(并在未来完成后检查它在 CorrelationData
中是否为 null - correlationData.getReturnedMessage()
)。如果它不为空,则消息无法路由到任何队列。
如果代理中存在错误,或者如果您使用的队列具有 x-max-length
和溢出行为 reject-publish
。