RabbitMQ BasicAck 使下一条消息 UnAck
RabbitMQ BasicAck makes next message UnAck
场景如下
一开始,
就绪队列:2
未确认:0
一旦 consumer.Queue.Dequeue(1000, out bdea);
运行s,
就绪队列:1
未确认:1
这很明显,我们阅读了一条消息但尚未确认。
问题是当 channel.BasicAck(bdea.DeliveryTag, false);
运行s,
就绪队列:0
未确认:1
处于就绪状态的消息变为 UnAcked,ReadyQueue 变为“0”!!
现在,在 while 循环中,当我们使用 consumer.Queue.Dequeue(1000, out bdea);
查找第二条消息时,bdea returns null 因为没有任何东西处于就绪状态。
这就是问题所在,当发生 Ack 时,它总是将一条消息从 Ready 队列拖到 UnAck。因此,下次我丢失这个从未出队的 UnAcked 消息时。
但是如果我停止进程(控制台应用程序),UnAck 消息会返回到就绪状态。
假设开始时有 10 条消息处于 Ready 状态,最后它只会处理 5 条消息,其中有 5 条消息处于 UnAcked 状态。每个 Ack 使下一条消息 UnAck。如果我停止并再次 运行(5 条消息处于 Ready 状态),猜猜看,3 条消息将得到处理,2 条将被 UnAcked。 (Dequeue 只选择消息数量的一半)
这是我的代码(仅具有 RabbitMQ 功能的代码,如果您也尝试此代码,则会出现问题),
public class TestMessages
{
private ConnectionFactory factory = new ConnectionFactory();
string billingFileId = string.Empty;
private IConnection connection = null;
private IModel channel = null;
public void Listen()
{
try
{
#region CONNECT
factory.AutomaticRecoveryEnabled = true;
factory.UserName = ConfigurationManager.AppSettings["MQUserName"];
factory.Password = ConfigurationManager.AppSettings["MQPassword"];
factory.VirtualHost = ConfigurationManager.AppSettings["MQVirtualHost"];
factory.HostName = ConfigurationManager.AppSettings["MQHostName"];
factory.Port = Convert.ToInt32(ConfigurationManager.AppSettings["MQPort"]);
#endregion
RabbitMQ.Client.Events.BasicDeliverEventArgs bdea;
using (connection = factory.CreateConnection())
{
string jobId = string.Empty;
using (IModel channel = connection.CreateModel())
{
while (true) //KEEP LISTNING
{
if (!channel.IsOpen)
throw new Exception("Channel is closed"); //Exit the loop.
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
//Prefetch 1 message
channel.BasicQos(0, 1, false);
String consumerTag = channel.BasicConsume(ConfigurationManager.AppSettings["MQQueueName"], false, consumer);
try
{
//Pull out the message
consumer.Queue.Dequeue(1000, out bdea);
if (bdea == null)
{
//Empty Queue
}
else
{
IBasicProperties props = bdea.BasicProperties;
byte[] body = bdea.Body;
string message = System.Text.Encoding.Default.GetString(bdea.Body);
try
{
channel.BasicAck(bdea.DeliveryTag, false);
////Heavy work starts now......
}
catch (Exception ex)
{
//Log
}
}
}
catch (Exception ex)
{
//Log it
}
}
}
}
}
catch (Exception ex)
{
WriteLog.Error(ex);
}
finally
{
//CleanUp();
}
}
}
我是不是漏掉了什么?
我尝试使用“订阅”而不是频道,它现在可以工作了,清除了消息队列。我提到了 this post。
这是工作代码:
public void SubscribeListner()
{
Subscription subscription = null;
const string uploaderExchange = "myQueueExchange";
string queueName = "myQueue";
while (true)
{
try
{
if (subscription == null)
{
try
{
//CONNECT Code
//try to open connection
connection = factory.CreateConnection();
}
catch (BrokerUnreachableException ex)
{
//You probably want to log the error and cancel after N tries,
//otherwise start the loop over to try to connect again after a second or so.
//log.Error(ex);
continue;
}
//crate chanel
channel = connection.CreateModel();
// This instructs the channel not to prefetch more than one message
channel.BasicQos(0, 1, false);
// Create a new, durable exchange
channel.ExchangeDeclare(uploaderExchange, ExchangeType.Direct, true, false, null);
// Create a new, durable queue
channel.QueueDeclare(queueName, true, false, false, null);
// Bind the queue to the exchange
channel.QueueBind(queueName, uploaderExchange, queueName);
//create subscription
subscription = new Subscription(channel, uploaderExchange, false);
}
BasicDeliverEventArgs eventArgs;
var gotMessage = subscription.Next(250, out eventArgs);//250 millisecond
if (gotMessage)
{
if (eventArgs == null)
{
//This means the connection is closed.
//DisposeAllConnectionObjects();
continue;//move to new iterate
}
//process message
subscription.Ack();
//channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
catch (OperationInterruptedException ex)
{
//log.Error(ex);
//DisposeAllConnectionObjects();
}
catch(Exception ex)
{
}
}
}
场景如下
一开始, 就绪队列:2 未确认:0
一旦 consumer.Queue.Dequeue(1000, out bdea);
运行s,
就绪队列:1 未确认:1 这很明显,我们阅读了一条消息但尚未确认。
问题是当 channel.BasicAck(bdea.DeliveryTag, false);
运行s,
就绪队列:0 未确认:1
处于就绪状态的消息变为 UnAcked,ReadyQueue 变为“0”!!
现在,在 while 循环中,当我们使用 consumer.Queue.Dequeue(1000, out bdea);
查找第二条消息时,bdea returns null 因为没有任何东西处于就绪状态。
这就是问题所在,当发生 Ack 时,它总是将一条消息从 Ready 队列拖到 UnAck。因此,下次我丢失这个从未出队的 UnAcked 消息时。
但是如果我停止进程(控制台应用程序),UnAck 消息会返回到就绪状态。
假设开始时有 10 条消息处于 Ready 状态,最后它只会处理 5 条消息,其中有 5 条消息处于 UnAcked 状态。每个 Ack 使下一条消息 UnAck。如果我停止并再次 运行(5 条消息处于 Ready 状态),猜猜看,3 条消息将得到处理,2 条将被 UnAcked。 (Dequeue 只选择消息数量的一半)
这是我的代码(仅具有 RabbitMQ 功能的代码,如果您也尝试此代码,则会出现问题),
public class TestMessages
{
private ConnectionFactory factory = new ConnectionFactory();
string billingFileId = string.Empty;
private IConnection connection = null;
private IModel channel = null;
public void Listen()
{
try
{
#region CONNECT
factory.AutomaticRecoveryEnabled = true;
factory.UserName = ConfigurationManager.AppSettings["MQUserName"];
factory.Password = ConfigurationManager.AppSettings["MQPassword"];
factory.VirtualHost = ConfigurationManager.AppSettings["MQVirtualHost"];
factory.HostName = ConfigurationManager.AppSettings["MQHostName"];
factory.Port = Convert.ToInt32(ConfigurationManager.AppSettings["MQPort"]);
#endregion
RabbitMQ.Client.Events.BasicDeliverEventArgs bdea;
using (connection = factory.CreateConnection())
{
string jobId = string.Empty;
using (IModel channel = connection.CreateModel())
{
while (true) //KEEP LISTNING
{
if (!channel.IsOpen)
throw new Exception("Channel is closed"); //Exit the loop.
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
//Prefetch 1 message
channel.BasicQos(0, 1, false);
String consumerTag = channel.BasicConsume(ConfigurationManager.AppSettings["MQQueueName"], false, consumer);
try
{
//Pull out the message
consumer.Queue.Dequeue(1000, out bdea);
if (bdea == null)
{
//Empty Queue
}
else
{
IBasicProperties props = bdea.BasicProperties;
byte[] body = bdea.Body;
string message = System.Text.Encoding.Default.GetString(bdea.Body);
try
{
channel.BasicAck(bdea.DeliveryTag, false);
////Heavy work starts now......
}
catch (Exception ex)
{
//Log
}
}
}
catch (Exception ex)
{
//Log it
}
}
}
}
}
catch (Exception ex)
{
WriteLog.Error(ex);
}
finally
{
//CleanUp();
}
}
}
我是不是漏掉了什么?
我尝试使用“订阅”而不是频道,它现在可以工作了,清除了消息队列。我提到了 this post。
这是工作代码:
public void SubscribeListner()
{
Subscription subscription = null;
const string uploaderExchange = "myQueueExchange";
string queueName = "myQueue";
while (true)
{
try
{
if (subscription == null)
{
try
{
//CONNECT Code
//try to open connection
connection = factory.CreateConnection();
}
catch (BrokerUnreachableException ex)
{
//You probably want to log the error and cancel after N tries,
//otherwise start the loop over to try to connect again after a second or so.
//log.Error(ex);
continue;
}
//crate chanel
channel = connection.CreateModel();
// This instructs the channel not to prefetch more than one message
channel.BasicQos(0, 1, false);
// Create a new, durable exchange
channel.ExchangeDeclare(uploaderExchange, ExchangeType.Direct, true, false, null);
// Create a new, durable queue
channel.QueueDeclare(queueName, true, false, false, null);
// Bind the queue to the exchange
channel.QueueBind(queueName, uploaderExchange, queueName);
//create subscription
subscription = new Subscription(channel, uploaderExchange, false);
}
BasicDeliverEventArgs eventArgs;
var gotMessage = subscription.Next(250, out eventArgs);//250 millisecond
if (gotMessage)
{
if (eventArgs == null)
{
//This means the connection is closed.
//DisposeAllConnectionObjects();
continue;//move to new iterate
}
//process message
subscription.Ack();
//channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
catch (OperationInterruptedException ex)
{
//log.Error(ex);
//DisposeAllConnectionObjects();
}
catch(Exception ex)
{
}
}
}