RabbitMQ BasicAck 使下一条消息 UnAck

RabbitMQ BasicAck makes next message UnAck

场景如下

一开始, 就绪队列:2 未确认:0

一旦 consumer.Queue.Dequeue(1000, out bdea); 运行s,

就绪队列:1 未确认:1 这很明显,我们阅读了一条消息但尚未确认。

问题是当 channel.BasicAck(bdea.DeliveryTag, false); 运行s,

就绪队列:0 未确认:1

处于就绪状态的消息变为 UnAcked,ReadyQueue 变为“0”!!

现在,在 while 循环中,当我们使用 consumer.Queue.Dequeue(1000, out bdea); 查找第二条消息时,bdea returns null 因为没有任何东西处于就绪状态。

这就是问题所在,当发生 Ack 时,它总是将一条消息从 Ready 队列拖到 UnAck。因此,下次我丢失这个从未出队的 UnAcked 消息时。

但是如果我停止进程(控制台应用程序),UnAck 消息会返回到就绪状态。

假设开始时有 10 条消息处于 Ready 状态,最后它只会处理 5 条消息,其中有 5 条消息处于 UnAcked 状态。每个 Ack 使下一条消息 UnAck。如果我停止并再次 运行(5 条消息处于 Ready 状态),猜猜看,3 条消息将得到处理,2 条将被 UnAcked。 (Dequeue 只选择消息数量的一半)

这是我的代码(仅具有 RabbitMQ 功能的代码,如果您也尝试此代码,则会出现问题),

public class TestMessages
{
    private ConnectionFactory factory = new ConnectionFactory();
    string billingFileId = string.Empty;
    private IConnection connection = null;
    private IModel channel = null;
    public void Listen()
    {
        try
        {
            #region CONNECT
            factory.AutomaticRecoveryEnabled = true;
            factory.UserName = ConfigurationManager.AppSettings["MQUserName"];
            factory.Password = ConfigurationManager.AppSettings["MQPassword"];
            factory.VirtualHost = ConfigurationManager.AppSettings["MQVirtualHost"];
            factory.HostName = ConfigurationManager.AppSettings["MQHostName"];
            factory.Port = Convert.ToInt32(ConfigurationManager.AppSettings["MQPort"]);
            #endregion


            RabbitMQ.Client.Events.BasicDeliverEventArgs bdea;
            using (connection = factory.CreateConnection())
            {
                string jobId = string.Empty;
                using (IModel channel = connection.CreateModel())
                {
                    while (true) //KEEP LISTNING
                    {
                        if (!channel.IsOpen)
                            throw new Exception("Channel is closed"); //Exit the loop.

                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);

                        //Prefetch 1 message
                        channel.BasicQos(0, 1, false);

                        String consumerTag = channel.BasicConsume(ConfigurationManager.AppSettings["MQQueueName"], false, consumer);

                        try
                        {
                            //Pull out the message
                            consumer.Queue.Dequeue(1000, out bdea);
                            if (bdea == null)
                            {
                                //Empty Queue
                            }
                            else
                            {
                                IBasicProperties props = bdea.BasicProperties;
                                byte[] body = bdea.Body;
                                string message = System.Text.Encoding.Default.GetString(bdea.Body);

                                try
                                {

                                    channel.BasicAck(bdea.DeliveryTag, false);

                                    ////Heavy work starts now......

                                }
                                catch (Exception ex)
                                {
                                    //Log

                                }
                            }
                        }
                        catch (Exception ex)
                        {
                            //Log it
                        }
                    }
                }
            }
        }
        catch (Exception ex)
        {
            WriteLog.Error(ex);
        }
        finally
        {
            //CleanUp();
        }
    }
}

我是不是漏掉了什么?

我尝试使用“订阅”而不是频道,它现在可以工作了,清除了消息队列。我提到了 this post

这是工作代码:

public void SubscribeListner()
{
    Subscription subscription = null;
    const string uploaderExchange = "myQueueExchange";
    string queueName = "myQueue";
    while (true)
    {
        try
        {
            if (subscription == null)
            {
                try
                {
                   //CONNECT Code

                    //try to open connection
                    connection = factory.CreateConnection();
                }
                catch (BrokerUnreachableException ex)
                {
                    //You probably want to log the error and cancel after N tries, 
                    //otherwise start the loop over to try to connect again after a second or so.
                    //log.Error(ex);
                    continue;
                }

                
                //crate chanel
                channel = connection.CreateModel();
                // This instructs the channel not to prefetch more than one message
                channel.BasicQos(0, 1, false);
                // Create a new, durable exchange
                channel.ExchangeDeclare(uploaderExchange, ExchangeType.Direct, true, false, null);
                // Create a new, durable queue
                channel.QueueDeclare(queueName, true, false, false, null);
                // Bind the queue to the exchange
                channel.QueueBind(queueName, uploaderExchange, queueName);
                //create subscription
                subscription = new Subscription(channel, uploaderExchange, false);
            }
            BasicDeliverEventArgs eventArgs;
            var gotMessage = subscription.Next(250, out eventArgs);//250 millisecond
            if (gotMessage)
            {
                if (eventArgs == null)
                {
                    //This means the connection is closed.
                    //DisposeAllConnectionObjects();
                    continue;//move to new iterate
                }

                //process message
                subscription.Ack(); 
                //channel.BasicAck(eventArgs.DeliveryTag, false);
            }
        }
        catch (OperationInterruptedException ex)
        {
            //log.Error(ex);
            //DisposeAllConnectionObjects();
        }
        catch(Exception ex)
        {

        }
    }
}