在 RabbitMQ 中手动确认消息
Manually ack messages in RabbitMQ
以前我读取队列中的所有消息,但现在我必须return根据用户选择(计数)确定特定数量的消息。
我尝试相应地更改 for 循环,但由于自动确认,它读取了所有消息。所以我尝试在配置文件中将其更改为手动。
在我的程序中如何在读取消息后手动确认消息(目前我正在使用 AmqpTemplate 接收并且我没有频道参考)?
Properties properties = admin.getQueueProperties("queue_name");
if(null != properties)
{
Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());
while(messageCount > 0)
{
Message msg = amqpTemplate.receive(queue_name);
String value = new String(msg.getBody());
valueList.add(value);
messageCount--;
}
}
非常感谢任何帮助,提前致谢。
您不能使用 receive()
方法手动确认 - 对于具有手动确认和 ChannelAwareMessageListener
的事件驱动消费者,请使用 SimpleMessageListenerContainer
。或者,使用模板的 execute()
方法,它可以让您访问 Channel
- 但是您将使用较低级别的 RabbitMQ API,而不是 Message
抽象。
编辑:
您需要学习底层的 RabbitMQ Java API 才能使用执行,但是像这样的东西会起作用...
final int messageCount = 3;
boolean result = template.execute(new ChannelCallback<Boolean>() {
@Override
public Boolean doInRabbit(final Channel channel) throws Exception {
int n = messageCount;
channel.basicQos(messageCount); // prefetch
long deliveryTag = 0;
while (n > 0) {
GetResponse result = channel.basicGet("si.test.queue", false);
if (result != null) {
System.out.println(new String(result.getBody()));
deliveryTag = result.getEnvelope().getDeliveryTag();
n--;
}
else {
Thread.sleep(1000);
}
}
if (deliveryTag > 0) {
channel.basicAck(deliveryTag, true);
}
return true;
}
});
以前我读取队列中的所有消息,但现在我必须return根据用户选择(计数)确定特定数量的消息。
我尝试相应地更改 for 循环,但由于自动确认,它读取了所有消息。所以我尝试在配置文件中将其更改为手动。
在我的程序中如何在读取消息后手动确认消息(目前我正在使用 AmqpTemplate 接收并且我没有频道参考)?
Properties properties = admin.getQueueProperties("queue_name");
if(null != properties)
{
Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());
while(messageCount > 0)
{
Message msg = amqpTemplate.receive(queue_name);
String value = new String(msg.getBody());
valueList.add(value);
messageCount--;
}
}
非常感谢任何帮助,提前致谢。
您不能使用 receive()
方法手动确认 - 对于具有手动确认和 ChannelAwareMessageListener
的事件驱动消费者,请使用 SimpleMessageListenerContainer
。或者,使用模板的 execute()
方法,它可以让您访问 Channel
- 但是您将使用较低级别的 RabbitMQ API,而不是 Message
抽象。
编辑:
您需要学习底层的 RabbitMQ Java API 才能使用执行,但是像这样的东西会起作用...
final int messageCount = 3;
boolean result = template.execute(new ChannelCallback<Boolean>() {
@Override
public Boolean doInRabbit(final Channel channel) throws Exception {
int n = messageCount;
channel.basicQos(messageCount); // prefetch
long deliveryTag = 0;
while (n > 0) {
GetResponse result = channel.basicGet("si.test.queue", false);
if (result != null) {
System.out.println(new String(result.getBody()));
deliveryTag = result.getEnvelope().getDeliveryTag();
n--;
}
else {
Thread.sleep(1000);
}
}
if (deliveryTag > 0) {
channel.basicAck(deliveryTag, true);
}
return true;
}
});