Java RabbitMQ consumer.nextMessage 总是收到相同的消息

Java RabbitMQ consumer.nextMessage always gets same message

我们在分布式服务架构中使用 Java rabbitMq 和 spring 启动。一项服务获取 HTTP 请求并将其转发到未知队列进行处理。同时它必须等待另一个队列的响应才能终止 HTTP 请求。 (这是一个由渲染器完成工作的预览请求)。

可以有多个 ServiceA(HTTP 接口)和 ServiceB(呈现器)实例,因此对于每条预览消息,我们还会发送一个唯一 ID 以用作路由键。

我在使用 BlockingConsumer 时遇到问题。每当我调用 consumer.nextMessage() 时,我都会一遍又一遍地收到相同的消息。这是双重奇怪的,因为它应该被确认并从队列中删除,而对于另一个消费者甚至不应该理会它,因为我们使用的唯一 ID 不再绑定到队列。 nextMessage 甚至 returns 在渲染器服务完成并发回其完成消息之前。

这是简化的设置:

一般

所有服务都对所有消息使用全局 DirectExchange

@Bean
  public DirectExchange globalDirectExchange() {
    return new DirectExchange(EXCHANGE_NAME, false, true);
  }

ServiceA(处理 HTTP 请求):

 private Content requestPreviewByKey(RenderMessage renderMessage, String previewKey) {
    String renderDoneRoutingKey= UUID.randomUUID().toString();
    renderMessage.setPreviewDoneKey(renderDoneId);
    Binding binding = BindingBuilder.bind(previewDoneQueue).to(globalDirectExchange)
        .with(renderDoneRoutingKey);
    try {
      amqpAdmin.declareBinding(binding);
      rabbitProducer.sendPreviewRequestToKey(renderMessage, previewKey);
      return getContentBlocking();
    } catch (Exception e) {
      logErrorIfDebug(type, e);
      throw new ApiException(BaseErrorCode.COMMUNICATION_ERROR, "Could not render preview");
    } finally {
      amqpAdmin.removeBinding(binding);
    }
  }


  private Content getContentBlocking() {
    BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(rabbitMqConfig.connectionFactory(), new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<>(), AcknowledgeMode.AUTO, true, 1, PREVIEW_DONE_QUEUE);
    try {
      blockingQueueConsumer.start();
      Message message = blockingQueueConsumer.nextMessage(waitForPreviewMs);
      if (!StringUtils.isEmpty(message)) {
        String result = new String(message.getBody());
        return JsonUtils.stringToObject(result, Content.class);
      }    
      throw new ApiException("Could not render preview");
    } catch (Exception e) {
      logError(e);
      throw new ApiException("Could not render preview");
    } finally {
      blockingQueueConsumer.stop();
    }

}

服务 B

我会为您省去大部分代码。我的日志显示一切顺利,一旦完成,服务就会将正确的消息发送到与初始呈现请求一起发送的 UUID 密钥。

public void sendPreviewDoneMessage(Content content, String previewDoneKey) {
    String message = JsonUtils.objectToString(content);
    rabbitTemplate.convertAndSend(globalDirectExchange, previewDoneKey, message);
}

整个过程都有效...一次... 真正的问题似乎是消费者设置。为什么我在使用 nextMessage() 时总是从队列中获取相同的(第一条)消息。 创建和删除 Bindung 是否确保在该实例中甚至只接收到绑定到该 routingKey 的消息? nextMessage() 不会确认消息并将其从队列中删除吗?!

非常感谢您的耐心等待,更要感谢您提供有用的答案!

BlockingQueueConsumer不是设计用来直接使用的;它是 SimpleMessageListenerContainer 的一个组件,它将在消息被侦听器使用后负责确认消息(容器调用 commitIfNecessary)。

直接使用这个消费者可能会有其他意想不到的副作用。

我强烈建议使用侦听器容器来消费消息。

如果您只想按需接收消息,请改用 RabbitTemplate receive()receiveAndConvert() 方法。