批量消费消息——RabbitMQ

Consume messages in batches - RabbitMQ

我能够使用上面的代码使用不同的路由密钥使用多个生产者发送到同一个交换的多条消息,并且能够将每条消息插入数据库。

但这会消耗太多资源,因为消息将一个接一个地插入到数据库中。所以我决定进行批量插入,我发现我可以设置 BasicQos

在BasicQos中将消息限制设置为10后,我的期望是Console.WriteLine必须写10条消息,但不是预期的那样。

我的期望是从队列中消耗 N 条消息并进行批量插入,并在成功发送 ACK 时发送 ACK 否则没有 ACK

这是我使用的一段代码。

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A");
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B");

        channel.BasicQos(0, 10, false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);

        consumer.Received += (model, ea) =>
        {
            try
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                // Insert into Database

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                Console.WriteLine(" Recevier Ack  " + ea.DeliveryTag);
            }
            catch (Exception e)
            {
                channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
                Console.WriteLine(" Recevier No Ack  " + ea.DeliveryTag);
            }
        };

        Console.ReadLine();
    }
}

BasicQos = 10 表示客户端一次只能获取 10 条消息,但是当您使用它时,您每次只会看到一条消息。 阅读此处:https://www.rabbitmq.com/consumer-prefetch.html

AMQP specifies the basic.qos method to allow you to limit the number of unacknowledged messages on a channel (or connection) when consuming (aka "prefetch count").

对于您的范围,您必须下载消息,将其放入临时列表中,然后插入到数据库中。

然后你可以使用:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);

void basicAck()

Parameters: deliveryTag - the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver

multiple - true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.

例子

final List<String> myMessagges = new ArrayList<String>();
        channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                myMessagges.add(new String(body));
                System.out.println("Received...");

                if (myMessagges.size() >= 10) {
                    System.out.println("insert into DB...");
                    channel.basicAck(envelope.getDeliveryTag(), true);
                    myMessagges.clear();
                }


            }
        });

可以使用 channel.basicQos() 完成基于批量大小的消耗。

Channel channel = connection.createChannel();
channel.basicQos(10);

它指定了在不为每个消息发送 ACK 的情况下要获取的最大消息数。

使用 DefaultConsumer class 并覆盖其方法。

Consumer batchConsumer = new DefaultConsumer(channel) {

  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
  }

  @Override
  public void handleCancelOk(String consumerTag) {
 
  }
};

使用 channel.basicConsume()

消耗 10 条消息
channel.basicConsume(QUEUE_NAME, false, batchConsumer);

当调用 channel.basicConsume() 时,它将获取一批 10 条消息。 'false' 设置为禁用自动确认,并且在消耗整个批次后只发送一次确认。

channel.basicAck(getLastMessageEnvelope().getDeliveryTag(), true);

这里'true'表示我们正在为多个消息发送ACK。

详细解释见

RabbitMQ Batch Consumption