RabbitMQ DefaultConsumer 导致消费者标签过多

RabbitMQ DefaultConsumer causing too many consumer tags

我有一个侦听特定队列的 RabbitMQ 客户端应用程序。客户端创建 DefaultConsumer 的实例并实现 handleDelivery 方法。这是代码

    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    public void receiveMessages() {
        try {
//            channel.basicQos(pollCount);
            Message message = new Message();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String response = new String(body, "UTF-8");
                    if (response != null) {
                        message.setId(NUID.nextGlobal());
                        message.setPayload(response);
                        message.setDeliveryTag(deliveryTag);
                        messages.add(message);
                        logger.info("Message received: ", message.getPayload());
                    }
                }
            };
            logger.debug("**********Channel status: " + channel.isOpen());
            channel.basicConsume(queueName, false, consumer);
        } catch (Exception e) {
            logger.error("Exception while getting messages from Rabbit ", e);

        }
    }

receiveMessages() 方法每 500 毫秒通过一个线程频繁调用,并将消息排入不同的列表以供消费。由于对 receiveMessages() 的这次投票,我观察到消费者标签在通过 Rabbit 控制台查看时不断创建和增长,如图中所示。看到那些越来越多的消费标签是正常的吗?

Is it normal to see those increasing consumer tags?

不,您的代码有错误。您需要要么只使用一个 long-运行ning 消费者,要么您必须在使用完后取消您的消费者。

我看不出有什么必要 "poll" receiveMessages - 让它自己 运行 吧,它会按照你的预期将消息添加到你的同步队列中。


注意: RabbitMQ 团队监控 rabbitmq-users mailing list 并且有时只在 Whosebug 上回答问题。

我终于找到了可行的解决方案。 正如 Luke Bakken 强调的那样,不需要轮询。我现在只调用 receiveMesssages() 一次。此后,当消息发布到队列中时,我的消费者正在接收回调。

 protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
 public void receiveMessages() {
    try {
        Message message = new Message();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            String response = new String(delivery.getBody(), "UTF-8");
            if (response != null) {
                message.setId(NUID.nextGlobal());
                message.setPayload(response);
                message.setDeliveryTag(deliveryTag);
                messages.add(message);
                logger.info("Message received: ", message.getPayload());
            };
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    } catch (Exception e) {
        logger.error("Exception while getting messages from Rabbit ", e);
    }
}

rabbit 控制台现在在绑定队列下只显示 1 个消费标签条目。

public NotificationConsumerService(ConnectionFactory connectionFactory, String host, Logger logger) {
    this.connectionFactory = connectionFactory;
    this.host = host;
    this.logger = logger;
}

public void consumeSliceChangeNotification() {
    connectionFactory.setHost(this.host);
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            JSONObject obj = new JSONObject(message);
            String namespace = obj.getString("namespace");
            logger.info("The latest change notification on the " + namespace +" is available");
        };

        channel.basicConsume(QUEUE_NAME, true,deliverCallback, consumerTag -> { } );
    } 
    catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }

}