我怎样才能让我的 RabbitMQ 消费者只接收一条消息而不会超时并再次发送消息?

How can I have my RabbitMQ consumer receive just one message without it timing-out and sending agan?

我们遇到了一个问题,如果一条消息的处理时间太长,它会从未确认状态移回就绪状态并再次发送。有什么办法可以避免吗?

我们有一个这样初始化的队列消费者class:

    private void Initialize()
    {
        string queueName = _configuration["QueueName"] + _configuration["QueueNameSuffixForSubscriber"];

        var factory = CreateConnectionFactory();

        var connection = factory.CreateConnection();
        _channel = connection.CreateModel();

        // The message TTL must match for QueueDeclare() to work.
        var arguments = new Dictionary<string, object>();
        arguments.Add("x-message-ttl", Convert.ToInt32(_configuration["EventBusMessageTtl"]));

        // By default, RabbitMQ dispatches all the messages to the first consumer. You can change this behaviour by
        // setting the BasicQos, this controls the no of messages a consumer can receive before it acknowledges it.
        _channel.BasicQos(0, 1, false);

        _channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += ConsumerReceived;

        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

注意这一行:

_channel.BasicQos(0, 1, false);

这就是我们的消费者一次只提取一条消息的方式。但是,如果该消息在发送 ack 之前花费的时间超过 2 分钟,RMQ 将再次发送该消息。我们不想要那样。 (几乎不会发生处理时间超过 2 分钟的情况,但我们不想满足于 almost。)

有没有办法阻止RMQ再次发送消息?我可以在 before 处理消息之前发送 ack,但是我们会立即收到下一条消息,我们也不希望这样。我们希望在接受下一条消息之前等待消息完成处理。

如果我们可以在准备就绪时从 RMQ 中,那就可以解决问题。

这里是 ConsumerReceived() 方法:

    private void ConsumerReceived(object model, BasicDeliverEventArgs eventArgs)
    {
        try
        {
            var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
            InvokeHandlers(eventArgs, message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
        }
        finally
        {
            _channel.BasicAck(eventArgs.DeliveryTag, false);
        }
    }

我同意这似乎是 polling 的理想流程,而不是消费者订阅。通常您不想轮询,因为它会极大地损害吞吐量,但在您的情况下,这正是您想要的。

while (true)
{
    BasicGetResult result = channel.BasicGet(queueName, noAck);
    if (result == null) 
    {
        // No message available at this time.
        // Sleep/delay to avoid cpu and I/O churn
        Thread.Sleep(2000);
    } 
    else 
    {
        try 
        {
            IBasicProperties props = result.BasicProperties;
            var message = Encoding.UTF8.GetString(result.Body.ToArray());
            InvokeHandlers(eventArgs, message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
        }
        finally
        {
            _channel.BasicAck(eventArgs.DeliveryTag, false);
        }
    }
}