消息自动删除 RabbitMQ
Message deleted automatically RabbitMQ
我已经使用 RabbitMQ 来存储消息。我注意到当应用程序重新启动时消息被删除了。
我在同一个应用程序中有生产者和消费者。
请找到生产者和消费者如下。我用过持久队列和持久消息。
所以如果队列只有一个消费者并且当前没有消费,那么队列消息将被删除。是这样吗?
制作人:
public static void PublishMessage(RequestDto message, string queueName)
{
var factory = new ConnectionFactory() { HostName = Config.RabbitMqHostName, Port = Config.RabbitMqPortNumber };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, true, false, false, null);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
// properties.DeliveryMode = 2; I have used this too.
string serializesMessage = Utility.SerializeSoapObject(message);
var messageBytes = Encoding.UTF8.GetBytes(serializesMessage);
channel.BasicPublish("", queueName, properties , messageBytes);
Log.Info("Record added into queue : \nMessage: " + serializesMessage);
}
}
}
消费者:
var factory = new ConnectionFactory() { HostName = Config.RabbitMqHostName, Port = Config.RabbitMqPortNumber };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(Config.RabbitMqQueueName, true, false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(Config.RabbitMqQueueName, true, consumer);
while (DoProcessMessage())
{
try
{
List<RequestDto> messages = GetMessagesInBatch(consumer);
if (messages.Count > 0)
{
ProcessMessageInParallel(messages);
}
else
{
Producer.FillRequestMessages();
}
}
catch (Exception exception)
{
Log.Error("StartConsumer - Failed to process message from RabbitMq Error: " + exception.Message, exception);
}
}
}
}
}
catch (Exception exception)
{
Log.Error(exception.Message, exception);
}
private bool DoProcessMessage()
{
return Config.MaxRequestPerDayCount > 1000;
}
如果有人能帮忙。
您似乎将 noAck = true
传递给 basicConsume 函数:
https://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.0/rabbitmq-java-client-javadoc-1.7.0/com/rabbitmq/client/Channel.html#basicConsume(java.lang.String,布尔值,com.rabbitmq.client.Consumer)
在无确认模式下,RabbitMQ 会将消息发送给消费者并立即将其从队列中删除。
我已经使用 RabbitMQ 来存储消息。我注意到当应用程序重新启动时消息被删除了。
我在同一个应用程序中有生产者和消费者。
请找到生产者和消费者如下。我用过持久队列和持久消息。
所以如果队列只有一个消费者并且当前没有消费,那么队列消息将被删除。是这样吗?
制作人:
public static void PublishMessage(RequestDto message, string queueName)
{
var factory = new ConnectionFactory() { HostName = Config.RabbitMqHostName, Port = Config.RabbitMqPortNumber };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, true, false, false, null);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
// properties.DeliveryMode = 2; I have used this too.
string serializesMessage = Utility.SerializeSoapObject(message);
var messageBytes = Encoding.UTF8.GetBytes(serializesMessage);
channel.BasicPublish("", queueName, properties , messageBytes);
Log.Info("Record added into queue : \nMessage: " + serializesMessage);
}
}
}
消费者:
var factory = new ConnectionFactory() { HostName = Config.RabbitMqHostName, Port = Config.RabbitMqPortNumber };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(Config.RabbitMqQueueName, true, false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(Config.RabbitMqQueueName, true, consumer);
while (DoProcessMessage())
{
try
{
List<RequestDto> messages = GetMessagesInBatch(consumer);
if (messages.Count > 0)
{
ProcessMessageInParallel(messages);
}
else
{
Producer.FillRequestMessages();
}
}
catch (Exception exception)
{
Log.Error("StartConsumer - Failed to process message from RabbitMq Error: " + exception.Message, exception);
}
}
}
}
}
catch (Exception exception)
{
Log.Error(exception.Message, exception);
}
private bool DoProcessMessage()
{
return Config.MaxRequestPerDayCount > 1000;
}
如果有人能帮忙。
您似乎将 noAck = true
传递给 basicConsume 函数:
https://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.0/rabbitmq-java-client-javadoc-1.7.0/com/rabbitmq/client/Channel.html#basicConsume(java.lang.String,布尔值,com.rabbitmq.client.Consumer)
在无确认模式下,RabbitMQ 会将消息发送给消费者并立即将其从队列中删除。