我怎样才能让我的 RabbitMQ 消费者只接收一条消息而不会超时并再次发送消息?
How can I have my RabbitMQ consumer receive just one message without it timing-out and sending agan?
我们遇到了一个问题,如果一条消息的处理时间太长,它会从未确认状态移回就绪状态并再次发送。有什么办法可以避免吗?
我们有一个这样初始化的队列消费者class:
private void Initialize()
{
string queueName = _configuration["QueueName"] + _configuration["QueueNameSuffixForSubscriber"];
var factory = CreateConnectionFactory();
var connection = factory.CreateConnection();
_channel = connection.CreateModel();
// The message TTL must match for QueueDeclare() to work.
var arguments = new Dictionary<string, object>();
arguments.Add("x-message-ttl", Convert.ToInt32(_configuration["EventBusMessageTtl"]));
// By default, RabbitMQ dispatches all the messages to the first consumer. You can change this behaviour by
// setting the BasicQos, this controls the no of messages a consumer can receive before it acknowledges it.
_channel.BasicQos(0, 1, false);
_channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += ConsumerReceived;
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
注意这一行:
_channel.BasicQos(0, 1, false);
这就是我们的消费者一次只提取一条消息的方式。但是,如果该消息在发送 ack 之前花费的时间超过 2 分钟,RMQ 将再次发送该消息。我们不想要那样。 (几乎不会发生处理时间超过 2 分钟的情况,但我们不想满足于 almost。)
有没有办法阻止RMQ再次发送消息?我可以在 before 处理消息之前发送 ack,但是我们会立即收到下一条消息,我们也不希望这样。我们希望在接受下一条消息之前等待消息完成处理。
如果我们可以在准备就绪时从 RMQ 中拉,那就可以解决问题。
这里是 ConsumerReceived()
方法:
private void ConsumerReceived(object model, BasicDeliverEventArgs eventArgs)
{
try
{
var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
InvokeHandlers(eventArgs, message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
}
finally
{
_channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
我同意这似乎是 polling 的理想流程,而不是消费者订阅。通常您不想轮询,因为它会极大地损害吞吐量,但在您的情况下,这正是您想要的。
while (true)
{
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null)
{
// No message available at this time.
// Sleep/delay to avoid cpu and I/O churn
Thread.Sleep(2000);
}
else
{
try
{
IBasicProperties props = result.BasicProperties;
var message = Encoding.UTF8.GetString(result.Body.ToArray());
InvokeHandlers(eventArgs, message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
}
finally
{
_channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
}
我们遇到了一个问题,如果一条消息的处理时间太长,它会从未确认状态移回就绪状态并再次发送。有什么办法可以避免吗?
我们有一个这样初始化的队列消费者class:
private void Initialize()
{
string queueName = _configuration["QueueName"] + _configuration["QueueNameSuffixForSubscriber"];
var factory = CreateConnectionFactory();
var connection = factory.CreateConnection();
_channel = connection.CreateModel();
// The message TTL must match for QueueDeclare() to work.
var arguments = new Dictionary<string, object>();
arguments.Add("x-message-ttl", Convert.ToInt32(_configuration["EventBusMessageTtl"]));
// By default, RabbitMQ dispatches all the messages to the first consumer. You can change this behaviour by
// setting the BasicQos, this controls the no of messages a consumer can receive before it acknowledges it.
_channel.BasicQos(0, 1, false);
_channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += ConsumerReceived;
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
注意这一行:
_channel.BasicQos(0, 1, false);
这就是我们的消费者一次只提取一条消息的方式。但是,如果该消息在发送 ack 之前花费的时间超过 2 分钟,RMQ 将再次发送该消息。我们不想要那样。 (几乎不会发生处理时间超过 2 分钟的情况,但我们不想满足于 almost。)
有没有办法阻止RMQ再次发送消息?我可以在 before 处理消息之前发送 ack,但是我们会立即收到下一条消息,我们也不希望这样。我们希望在接受下一条消息之前等待消息完成处理。
如果我们可以在准备就绪时从 RMQ 中拉,那就可以解决问题。
这里是 ConsumerReceived()
方法:
private void ConsumerReceived(object model, BasicDeliverEventArgs eventArgs)
{
try
{
var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
InvokeHandlers(eventArgs, message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
}
finally
{
_channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
我同意这似乎是 polling 的理想流程,而不是消费者订阅。通常您不想轮询,因为它会极大地损害吞吐量,但在您的情况下,这正是您想要的。
while (true)
{
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null)
{
// No message available at this time.
// Sleep/delay to avoid cpu and I/O churn
Thread.Sleep(2000);
}
else
{
try
{
IBasicProperties props = result.BasicProperties;
var message = Encoding.UTF8.GetString(result.Body.ToArray());
InvokeHandlers(eventArgs, message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
}
finally
{
_channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
}