RabbitMQ DefaultConsumer 导致消费者标签过多
RabbitMQ DefaultConsumer causing too many consumer tags
我有一个侦听特定队列的 RabbitMQ 客户端应用程序。客户端创建 DefaultConsumer 的实例并实现 handleDelivery 方法。这是代码
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
public void receiveMessages() {
try {
// channel.basicQos(pollCount);
Message message = new Message();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String response = new String(body, "UTF-8");
if (response != null) {
message.setId(NUID.nextGlobal());
message.setPayload(response);
message.setDeliveryTag(deliveryTag);
messages.add(message);
logger.info("Message received: ", message.getPayload());
}
}
};
logger.debug("**********Channel status: " + channel.isOpen());
channel.basicConsume(queueName, false, consumer);
} catch (Exception e) {
logger.error("Exception while getting messages from Rabbit ", e);
}
}
receiveMessages() 方法每 500 毫秒通过一个线程频繁调用,并将消息排入不同的列表以供消费。由于对 receiveMessages() 的这次投票,我观察到消费者标签在通过 Rabbit 控制台查看时不断创建和增长,如图中所示。看到那些越来越多的消费标签是正常的吗?
Is it normal to see those increasing consumer tags?
不,您的代码有错误。您需要要么只使用一个 long-运行ning 消费者,要么您必须在使用完后取消您的消费者。
我看不出有什么必要 "poll" receiveMessages
- 让它自己 运行 吧,它会按照你的预期将消息添加到你的同步队列中。
注意: RabbitMQ 团队监控 rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。
我终于找到了可行的解决方案。
正如 Luke Bakken 强调的那样,不需要轮询。我现在只调用 receiveMesssages()
一次。此后,当消息发布到队列中时,我的消费者正在接收回调。
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
public void receiveMessages() {
try {
Message message = new Message();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
String response = new String(delivery.getBody(), "UTF-8");
if (response != null) {
message.setId(NUID.nextGlobal());
message.setPayload(response);
message.setDeliveryTag(deliveryTag);
messages.add(message);
logger.info("Message received: ", message.getPayload());
};
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
} catch (Exception e) {
logger.error("Exception while getting messages from Rabbit ", e);
}
}
rabbit 控制台现在在绑定队列下只显示 1 个消费标签条目。
public NotificationConsumerService(ConnectionFactory connectionFactory, String host, Logger logger) {
this.connectionFactory = connectionFactory;
this.host = host;
this.logger = logger;
}
public void consumeSliceChangeNotification() {
connectionFactory.setHost(this.host);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
JSONObject obj = new JSONObject(message);
String namespace = obj.getString("namespace");
logger.info("The latest change notification on the " + namespace +" is available");
};
channel.basicConsume(QUEUE_NAME, true,deliverCallback, consumerTag -> { } );
}
catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
我有一个侦听特定队列的 RabbitMQ 客户端应用程序。客户端创建 DefaultConsumer 的实例并实现 handleDelivery 方法。这是代码
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
public void receiveMessages() {
try {
// channel.basicQos(pollCount);
Message message = new Message();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String response = new String(body, "UTF-8");
if (response != null) {
message.setId(NUID.nextGlobal());
message.setPayload(response);
message.setDeliveryTag(deliveryTag);
messages.add(message);
logger.info("Message received: ", message.getPayload());
}
}
};
logger.debug("**********Channel status: " + channel.isOpen());
channel.basicConsume(queueName, false, consumer);
} catch (Exception e) {
logger.error("Exception while getting messages from Rabbit ", e);
}
}
receiveMessages() 方法每 500 毫秒通过一个线程频繁调用,并将消息排入不同的列表以供消费。由于对 receiveMessages() 的这次投票,我观察到消费者标签在通过 Rabbit 控制台查看时不断创建和增长,如图中所示。看到那些越来越多的消费标签是正常的吗?
Is it normal to see those increasing consumer tags?
不,您的代码有错误。您需要要么只使用一个 long-运行ning 消费者,要么您必须在使用完后取消您的消费者。
我看不出有什么必要 "poll" receiveMessages
- 让它自己 运行 吧,它会按照你的预期将消息添加到你的同步队列中。
注意: RabbitMQ 团队监控 rabbitmq-users
mailing list 并且有时只在 Whosebug 上回答问题。
我终于找到了可行的解决方案。
正如 Luke Bakken 强调的那样,不需要轮询。我现在只调用 receiveMesssages()
一次。此后,当消息发布到队列中时,我的消费者正在接收回调。
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
public void receiveMessages() {
try {
Message message = new Message();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
String response = new String(delivery.getBody(), "UTF-8");
if (response != null) {
message.setId(NUID.nextGlobal());
message.setPayload(response);
message.setDeliveryTag(deliveryTag);
messages.add(message);
logger.info("Message received: ", message.getPayload());
};
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
} catch (Exception e) {
logger.error("Exception while getting messages from Rabbit ", e);
}
}
rabbit 控制台现在在绑定队列下只显示 1 个消费标签条目。
public NotificationConsumerService(ConnectionFactory connectionFactory, String host, Logger logger) {
this.connectionFactory = connectionFactory;
this.host = host;
this.logger = logger;
}
public void consumeSliceChangeNotification() {
connectionFactory.setHost(this.host);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
JSONObject obj = new JSONObject(message);
String namespace = obj.getString("namespace");
logger.info("The latest change notification on the " + namespace +" is available");
};
channel.basicConsume(QUEUE_NAME, true,deliverCallback, consumerTag -> { } );
}
catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}