Java RabbitMQ consumer.nextMessage 总是收到相同的消息
Java RabbitMQ consumer.nextMessage always gets same message
我们在分布式服务架构中使用 Java rabbitMq 和 spring 启动。一项服务获取 HTTP 请求并将其转发到未知队列进行处理。同时它必须等待另一个队列的响应才能终止 HTTP 请求。 (这是一个由渲染器完成工作的预览请求)。
可以有多个 ServiceA(HTTP 接口)和 ServiceB(呈现器)实例,因此对于每条预览消息,我们还会发送一个唯一 ID 以用作路由键。
我在使用 BlockingConsumer 时遇到问题。每当我调用 consumer.nextMessage() 时,我都会一遍又一遍地收到相同的消息。这是双重奇怪的,因为它应该被确认并从队列中删除,而对于另一个消费者甚至不应该理会它,因为我们使用的唯一 ID 不再绑定到队列。 nextMessage 甚至 returns 在渲染器服务完成并发回其完成消息之前。
这是简化的设置:
一般
所有服务都对所有消息使用全局 DirectExchange
@Bean
public DirectExchange globalDirectExchange() {
return new DirectExchange(EXCHANGE_NAME, false, true);
}
ServiceA(处理 HTTP 请求):
private Content requestPreviewByKey(RenderMessage renderMessage, String previewKey) {
String renderDoneRoutingKey= UUID.randomUUID().toString();
renderMessage.setPreviewDoneKey(renderDoneId);
Binding binding = BindingBuilder.bind(previewDoneQueue).to(globalDirectExchange)
.with(renderDoneRoutingKey);
try {
amqpAdmin.declareBinding(binding);
rabbitProducer.sendPreviewRequestToKey(renderMessage, previewKey);
return getContentBlocking();
} catch (Exception e) {
logErrorIfDebug(type, e);
throw new ApiException(BaseErrorCode.COMMUNICATION_ERROR, "Could not render preview");
} finally {
amqpAdmin.removeBinding(binding);
}
}
private Content getContentBlocking() {
BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(rabbitMqConfig.connectionFactory(), new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<>(), AcknowledgeMode.AUTO, true, 1, PREVIEW_DONE_QUEUE);
try {
blockingQueueConsumer.start();
Message message = blockingQueueConsumer.nextMessage(waitForPreviewMs);
if (!StringUtils.isEmpty(message)) {
String result = new String(message.getBody());
return JsonUtils.stringToObject(result, Content.class);
}
throw new ApiException("Could not render preview");
} catch (Exception e) {
logError(e);
throw new ApiException("Could not render preview");
} finally {
blockingQueueConsumer.stop();
}
}
服务 B
我会为您省去大部分代码。我的日志显示一切顺利,一旦完成,服务就会将正确的消息发送到与初始呈现请求一起发送的 UUID 密钥。
public void sendPreviewDoneMessage(Content content, String previewDoneKey) {
String message = JsonUtils.objectToString(content);
rabbitTemplate.convertAndSend(globalDirectExchange, previewDoneKey, message);
}
整个过程都有效...一次...
真正的问题似乎是消费者设置。为什么我在使用 nextMessage() 时总是从队列中获取相同的(第一条)消息。
创建和删除 Bindung 是否确保在该实例中甚至只接收到绑定到该 routingKey 的消息? nextMessage() 不会确认消息并将其从队列中删除吗?!
非常感谢您的耐心等待,更要感谢您提供有用的答案!
BlockingQueueConsumer
不是设计用来直接使用的;它是 SimpleMessageListenerContainer
的一个组件,它将在消息被侦听器使用后负责确认消息(容器调用 commitIfNecessary
)。
直接使用这个消费者可能会有其他意想不到的副作用。
我强烈建议使用侦听器容器来消费消息。
如果您只想按需接收消息,请改用 RabbitTemplate
receive()
或 receiveAndConvert()
方法。
我们在分布式服务架构中使用 Java rabbitMq 和 spring 启动。一项服务获取 HTTP 请求并将其转发到未知队列进行处理。同时它必须等待另一个队列的响应才能终止 HTTP 请求。 (这是一个由渲染器完成工作的预览请求)。
可以有多个 ServiceA(HTTP 接口)和 ServiceB(呈现器)实例,因此对于每条预览消息,我们还会发送一个唯一 ID 以用作路由键。
我在使用 BlockingConsumer 时遇到问题。每当我调用 consumer.nextMessage() 时,我都会一遍又一遍地收到相同的消息。这是双重奇怪的,因为它应该被确认并从队列中删除,而对于另一个消费者甚至不应该理会它,因为我们使用的唯一 ID 不再绑定到队列。 nextMessage 甚至 returns 在渲染器服务完成并发回其完成消息之前。
这是简化的设置:
一般
所有服务都对所有消息使用全局 DirectExchange
@Bean
public DirectExchange globalDirectExchange() {
return new DirectExchange(EXCHANGE_NAME, false, true);
}
ServiceA(处理 HTTP 请求):
private Content requestPreviewByKey(RenderMessage renderMessage, String previewKey) {
String renderDoneRoutingKey= UUID.randomUUID().toString();
renderMessage.setPreviewDoneKey(renderDoneId);
Binding binding = BindingBuilder.bind(previewDoneQueue).to(globalDirectExchange)
.with(renderDoneRoutingKey);
try {
amqpAdmin.declareBinding(binding);
rabbitProducer.sendPreviewRequestToKey(renderMessage, previewKey);
return getContentBlocking();
} catch (Exception e) {
logErrorIfDebug(type, e);
throw new ApiException(BaseErrorCode.COMMUNICATION_ERROR, "Could not render preview");
} finally {
amqpAdmin.removeBinding(binding);
}
}
private Content getContentBlocking() {
BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(rabbitMqConfig.connectionFactory(), new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<>(), AcknowledgeMode.AUTO, true, 1, PREVIEW_DONE_QUEUE);
try {
blockingQueueConsumer.start();
Message message = blockingQueueConsumer.nextMessage(waitForPreviewMs);
if (!StringUtils.isEmpty(message)) {
String result = new String(message.getBody());
return JsonUtils.stringToObject(result, Content.class);
}
throw new ApiException("Could not render preview");
} catch (Exception e) {
logError(e);
throw new ApiException("Could not render preview");
} finally {
blockingQueueConsumer.stop();
}
}
服务 B
我会为您省去大部分代码。我的日志显示一切顺利,一旦完成,服务就会将正确的消息发送到与初始呈现请求一起发送的 UUID 密钥。
public void sendPreviewDoneMessage(Content content, String previewDoneKey) {
String message = JsonUtils.objectToString(content);
rabbitTemplate.convertAndSend(globalDirectExchange, previewDoneKey, message);
}
整个过程都有效...一次... 真正的问题似乎是消费者设置。为什么我在使用 nextMessage() 时总是从队列中获取相同的(第一条)消息。 创建和删除 Bindung 是否确保在该实例中甚至只接收到绑定到该 routingKey 的消息? nextMessage() 不会确认消息并将其从队列中删除吗?!
非常感谢您的耐心等待,更要感谢您提供有用的答案!
BlockingQueueConsumer
不是设计用来直接使用的;它是 SimpleMessageListenerContainer
的一个组件,它将在消息被侦听器使用后负责确认消息(容器调用 commitIfNecessary
)。
直接使用这个消费者可能会有其他意想不到的副作用。
我强烈建议使用侦听器容器来消费消息。
如果您只想按需接收消息,请改用 RabbitTemplate
receive()
或 receiveAndConvert()
方法。