批量消费消息——RabbitMQ
Consume messages in batches - RabbitMQ
我能够使用上面的代码使用不同的路由密钥使用多个生产者发送到同一个交换的多条消息,并且能够将每条消息插入数据库。
但这会消耗太多资源,因为消息将一个接一个地插入到数据库中。所以我决定进行批量插入,我发现我可以设置 BasicQos
在BasicQos中将消息限制设置为10后,我的期望是Console.WriteLine
必须写10条消息,但不是预期的那样。
我的期望是从队列中消耗 N 条消息并进行批量插入,并在成功发送 ACK 时发送 ACK 否则没有 ACK
这是我使用的一段代码。
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A");
channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B");
channel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// Insert into Database
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(" Recevier Ack " + ea.DeliveryTag);
}
catch (Exception e)
{
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine(" Recevier No Ack " + ea.DeliveryTag);
}
};
Console.ReadLine();
}
}
BasicQos = 10
表示客户端一次只能获取 10 条消息,但是当您使用它时,您每次只会看到一条消息。
阅读此处:https://www.rabbitmq.com/consumer-prefetch.html
AMQP specifies the basic.qos method to allow you to limit the number
of unacknowledged messages on a channel (or connection) when consuming
(aka "prefetch count").
对于您的范围,您必须下载消息,将其放入临时列表中,然后插入到数据库中。
然后你可以使用:
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
void basicAck()
Parameters:
deliveryTag - the tag from the received
AMQP.Basic.GetOk or AMQP.Basic.Deliver
multiple - true to acknowledge
all messages up to and including the supplied delivery tag; false to
acknowledge just the supplied delivery tag.
例子
final List<String> myMessagges = new ArrayList<String>();
channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
myMessagges.add(new String(body));
System.out.println("Received...");
if (myMessagges.size() >= 10) {
System.out.println("insert into DB...");
channel.basicAck(envelope.getDeliveryTag(), true);
myMessagges.clear();
}
}
});
可以使用 channel.basicQos() 完成基于批量大小的消耗。
Channel channel = connection.createChannel();
channel.basicQos(10);
它指定了在不为每个消息发送 ACK 的情况下要获取的最大消息数。
使用 DefaultConsumer class 并覆盖其方法。
Consumer batchConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
}
@Override
public void handleCancelOk(String consumerTag) {
}
};
使用 channel.basicConsume()
消耗 10 条消息
channel.basicConsume(QUEUE_NAME, false, batchConsumer);
当调用 channel.basicConsume() 时,它将获取一批 10 条消息。 'false' 设置为禁用自动确认,并且在消耗整个批次后只发送一次确认。
channel.basicAck(getLastMessageEnvelope().getDeliveryTag(), true);
这里'true'表示我们正在为多个消息发送ACK。
详细解释见
我能够使用上面的代码使用不同的路由密钥使用多个生产者发送到同一个交换的多条消息,并且能够将每条消息插入数据库。
但这会消耗太多资源,因为消息将一个接一个地插入到数据库中。所以我决定进行批量插入,我发现我可以设置 BasicQos
在BasicQos中将消息限制设置为10后,我的期望是Console.WriteLine
必须写10条消息,但不是预期的那样。
我的期望是从队列中消耗 N 条消息并进行批量插入,并在成功发送 ACK 时发送 ACK 否则没有 ACK
这是我使用的一段代码。
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A");
channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B");
channel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);
consumer.Received += (model, ea) =>
{
try
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// Insert into Database
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(" Recevier Ack " + ea.DeliveryTag);
}
catch (Exception e)
{
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine(" Recevier No Ack " + ea.DeliveryTag);
}
};
Console.ReadLine();
}
}
BasicQos = 10
表示客户端一次只能获取 10 条消息,但是当您使用它时,您每次只会看到一条消息。
阅读此处:https://www.rabbitmq.com/consumer-prefetch.html
AMQP specifies the basic.qos method to allow you to limit the number of unacknowledged messages on a channel (or connection) when consuming (aka "prefetch count").
对于您的范围,您必须下载消息,将其放入临时列表中,然后插入到数据库中。
然后你可以使用:
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
void basicAck()
Parameters: deliveryTag - the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver
multiple - true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.
例子
final List<String> myMessagges = new ArrayList<String>();
channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
myMessagges.add(new String(body));
System.out.println("Received...");
if (myMessagges.size() >= 10) {
System.out.println("insert into DB...");
channel.basicAck(envelope.getDeliveryTag(), true);
myMessagges.clear();
}
}
});
可以使用 channel.basicQos() 完成基于批量大小的消耗。
Channel channel = connection.createChannel();
channel.basicQos(10);
它指定了在不为每个消息发送 ACK 的情况下要获取的最大消息数。
使用 DefaultConsumer class 并覆盖其方法。
Consumer batchConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
}
@Override
public void handleCancelOk(String consumerTag) {
}
};
使用 channel.basicConsume()
消耗 10 条消息channel.basicConsume(QUEUE_NAME, false, batchConsumer);
当调用 channel.basicConsume() 时,它将获取一批 10 条消息。 'false' 设置为禁用自动确认,并且在消耗整个批次后只发送一次确认。
channel.basicAck(getLastMessageEnvelope().getDeliveryTag(), true);
这里'true'表示我们正在为多个消息发送ACK。
详细解释见