只有一个消费者从 RabbitMQ 上的队列接收消息
only one consumer receives messages from the queue on RabbitMQ
我创建了一个小演示来展示 RabbitMQ 的基础知识。不幸的是,它没有按预期工作,有两个问题。我正在使用 .NET Core 3.1 和 RabbitMQ.Client 6.2.2
我创建了从任务队列接收消息的 Employee class。第一位员工工作得很好,但如果我开始更多员工,他们就无法工作(不接收消息)。我不明白为什么会这样。
如果我在队列中有很多消息(在启动第二个员工之前),我会看到任务队列中的所有消息在第二个开始时得到确认,然后在短时间后它们再次变为未确认。有点奇怪。
但主要是:为什么其他员工不工作?
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace DemoTasks.Employee
{
class Employee
{
static void Main(string[] args)
{
string clientName = "Employee-" + Guid.NewGuid().ToString();
Console.Title = clientName;
Console.WriteLine("Moin moin");
IConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
VirtualHost = "/",
UserName = "user",
Password = "password",
ClientProvidedName = clientName
};
using (IConnection connection = connectionFactory.CreateConnection(clientName))
{
using (IModel model = connection.CreateModel())
{
model.ExchangeDeclare("jobs", "fanout", false, false, null);
model.QueueDeclare("tasks", true, false, false);
model.QueueBind("tasks", "jobs", "", null);
EventingBasicConsumer consumer = new EventingBasicConsumer(model);
consumer.Received += OnWorkReceived;
model.BasicConsume("tasks", false, clientName + ":OnWorkReceived", consumer);
Console.ReadLine();
model.Close();
}
connection.Close();
}
Console.WriteLine("Wochenende ... woooh !!!");
}
private static void OnWorkReceived(object sender, BasicDeliverEventArgs e)
{
EventingBasicConsumer consumer = (EventingBasicConsumer)sender;
IModel model = consumer.Model;
string task = Encoding.UTF8.GetString(e.Body.ToArray());
Console.Write("working on: " + task + " ... ");
Thread.Sleep(5000);
Console.WriteLine("done!");
model.BasicAck(e.DeliveryTag, false);
}
}
}
我认为您的问题是关于在您的频道上设置 PrefetchCount
。它是关于一个消费者可以从 rabbit 获取多少消息并将它们缓存在自己身上来处理它们。
如果不设置,一个消费者可以消费队列中的所有消息,其他消费者没有时间获取消息,所以可以使用channel.basicQos(1)
或basicqos(0,1,false)
来设置。通过此设置,每个消费者在发送 ack 给 rabbit 后可以收到一条消息,然后可以收到另一条消息。
将预取计数设置为较低的数字时,可能会影响性能,因为您的消费者必须更多地请求 rabbit 才能获取消息。
我创建了一个小演示来展示 RabbitMQ 的基础知识。不幸的是,它没有按预期工作,有两个问题。我正在使用 .NET Core 3.1 和 RabbitMQ.Client 6.2.2
我创建了从任务队列接收消息的 Employee class。第一位员工工作得很好,但如果我开始更多员工,他们就无法工作(不接收消息)。我不明白为什么会这样。
如果我在队列中有很多消息(在启动第二个员工之前),我会看到任务队列中的所有消息在第二个开始时得到确认,然后在短时间后它们再次变为未确认。有点奇怪。
但主要是:为什么其他员工不工作?
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace DemoTasks.Employee
{
class Employee
{
static void Main(string[] args)
{
string clientName = "Employee-" + Guid.NewGuid().ToString();
Console.Title = clientName;
Console.WriteLine("Moin moin");
IConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
VirtualHost = "/",
UserName = "user",
Password = "password",
ClientProvidedName = clientName
};
using (IConnection connection = connectionFactory.CreateConnection(clientName))
{
using (IModel model = connection.CreateModel())
{
model.ExchangeDeclare("jobs", "fanout", false, false, null);
model.QueueDeclare("tasks", true, false, false);
model.QueueBind("tasks", "jobs", "", null);
EventingBasicConsumer consumer = new EventingBasicConsumer(model);
consumer.Received += OnWorkReceived;
model.BasicConsume("tasks", false, clientName + ":OnWorkReceived", consumer);
Console.ReadLine();
model.Close();
}
connection.Close();
}
Console.WriteLine("Wochenende ... woooh !!!");
}
private static void OnWorkReceived(object sender, BasicDeliverEventArgs e)
{
EventingBasicConsumer consumer = (EventingBasicConsumer)sender;
IModel model = consumer.Model;
string task = Encoding.UTF8.GetString(e.Body.ToArray());
Console.Write("working on: " + task + " ... ");
Thread.Sleep(5000);
Console.WriteLine("done!");
model.BasicAck(e.DeliveryTag, false);
}
}
}
我认为您的问题是关于在您的频道上设置 PrefetchCount
。它是关于一个消费者可以从 rabbit 获取多少消息并将它们缓存在自己身上来处理它们。
如果不设置,一个消费者可以消费队列中的所有消息,其他消费者没有时间获取消息,所以可以使用channel.basicQos(1)
或basicqos(0,1,false)
来设置。通过此设置,每个消费者在发送 ack 给 rabbit 后可以收到一条消息,然后可以收到另一条消息。
将预取计数设置为较低的数字时,可能会影响性能,因为您的消费者必须更多地请求 rabbit 才能获取消息。