C# RabbitMQ 为什么第二个工人不接工作?

C# RabbitMQ why is the second worker not picking up work?

我正在尝试了解所有 RabbitMQ 选项,我想我想要的只是一个工作队列,所以我有一个队列,工作人员只需取出一个项目并处理它。

我创建了一个新的 Direct Exchange(我认为这是对的!?)

首先我想知道为什么在这个例子中,我向 exchange/queue 添加了 4 条新消息。我不启动任何工人。然后我开始第一个工作,然后是第二个,但是第二个不处理任何一个,第一个工作处理所有!?

我做错了什么,为什么这不起作用?请在下面找到完整的示例代码。

我似乎也没有确认发布工作正常,因为只有有时在最右边的输出上才会显示“消息已确认...” 我已阅读 https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html 并浏览了后面的其他页面,但不是很清楚。

发射器:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using RabbitMQ.Client;

namespace PublishConfirms.Emit
{
    class Program
    {
        public static ConcurrentDictionary<ulong, string> _outstandingConfirms = new ConcurrentDictionary<ulong, string>();
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost"
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    // We need to enable published confirms on the channel
                    channel.ConfirmSelect();

                    channel.ExchangeDeclare(exchange: "DirectExchange",
                        type: ExchangeType.Direct);

                    var queueName = "DirectExchangeQueue";
                    // Make sure to create the queue in case it doesn't exits
                    channel.QueueDeclare(queue: queueName,
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    channel.BasicAcks += (sender, ea) =>
                    {
                        Console.WriteLine("Message Acknowledged with Delivery Tag {0}", ea.DeliveryTag);
                        // message is confirmed
                        CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                    };

                    channel.BasicNacks += (sender, ea) =>
                    {
                        // message is nack-ed (messages that have been lost
                        _outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
                        Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
                        CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                    };

                    var message = "A message here";
                    var body = Encoding.UTF8.GetBytes(message);

                    _outstandingConfirms.TryAdd(channel.NextPublishSeqNo, message);

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    channel.BasicPublish(exchange: "",
                        routingKey: queueName,
                        basicProperties: properties,
                        body: body);

                    Console.WriteLine("Sent message '{0}'", message);
                }
            }
            Console.WriteLine("Press any key to exit");
            Console.ReadLine();
        }

        static void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
        {
            if (multiple)
            {
                var confirmed = _outstandingConfirms.Where(k => k.Key <= sequenceNumber);
                foreach (var entry in confirmed)
                {
                    _outstandingConfirms.TryRemove(entry.Key, out _);
                }
            }
            else
            {
                _outstandingConfirms.TryRemove(sequenceNumber, out _);
            }
        }
    }
}

worker/receiver

using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace PublishConfirms.Receive
{
    class Program
    {
        // We can run multiples of these and only one will get a messages from the queue with no sharing
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost"
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    // This will create the exchange if needed
                    channel.ExchangeDeclare(exchange: "DirectExchange",
                        type: ExchangeType.Direct);

                    var queueName = "DirectExchangeQueue";
                    // Make sure to create the queue in case it doesn't exits
                    channel.QueueDeclare(queue: queueName,
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    channel.QueueBind(queue: queueName,
                        exchange: "DirectExchange",
                        routingKey: ""); // We keep the routing key the same as we dont want different handlers
                    // If we were to have different routes then we 
                    // would most probably have to create a random queue e.g.
                    // var queueName = channel.QueueDeclare().QueueName;

                    Console.WriteLine("Waiting for messages...");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("Received message: {0}", message);
                        Console.WriteLine("Processing...");
                        Thread.Sleep(3000); // simulate some work
                        Console.WriteLine("Processing Complete");
                        // send an acknowledgement back
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };

                    channel.BasicConsume(queue: queueName,
                        autoAck: false,
                        consumer: consumer);

                    Console.WriteLine("Press any key to exit");
                    Console.ReadLine();
                }
            }
        }
    }
}

您的问题在于理解 Direct 交换器的工作原理,可能还有队列 binding keysrouting keysDirect 交换机将其消息传递到 binding key 与消息的 routing key 完全匹配的队列。 binding key 是队列绑定到交换所依据的键。 routing key 是发送的消息由

路由的密钥

因此您的代码的问题是:

  • 通过发布您的消息:
...

 channel.BasicPublish(exchange: "",
                        routingKey: "", // 
                        basicProperties: properties,
                        body: body);

...
  • 或者您可以在创建队列时更改绑定键:
...

 channel.QueueBind(queue: queueName,
                        exchange: "DirectExchange",
                        routingKey: queueName); // As you stated in your code that 
                                                // the queue name is your bindingKey

...

你可以选择你喜欢的那个。


除此之外,当你发布你的消息时,你将它发送到一个空的交换名称, 应该是:

channel.BasicPublish(exchange: "DirectExchange", // As you stated at the start of your code
                        routingKey: queueName,
                        basicProperties: properties,
                        body: body);


我建议阅读更多有关 RabbitMQ 的内容,并了解更多有关其实施的概念。 您可能想查看有关以下内容的参考资料:

希望对您有所帮助。

在阅读了 itama 所说的内容后,我又看了一眼,首先 BasicPublish 交换是一个空字符串,所以没有帮助!我决定不使用路由键作为队列名称并将其保留为空字符串。

对我来说,我想要交换一个队列,然后多个工作人员一次处理队列中的一个项目。

当我在 worker 上添加这段代码时,它只由一个工作人员处理的问题似乎得到了解决。

channel.BasicQos(0, 1, false);

添加这个后,其他工作人员各收到一条消息。我最终向队列中添加了 50-100 条消息,然后启动了 10 个工作人员,他们都得到了一个队列项并对其进行了处理,这很高兴看到。

对于发布确认 BasicAcks 和 BasicNacks 回调,我发现了这个 link https://rianjs.net/2013/12/publisher-confirms-with-rabbitmq-and-c-sharp 这表明我们需要调用 channel.WaitFormConfirmsOrDie() 在调用 BasicPublish 之后,之前我的代码没有等待确认。

完整代码如下:

emitter/producer:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using RabbitMQ.Client;

namespace PublishConfirms.Emit
{
    class Program
    {
        public static ConcurrentDictionary<ulong, string> _outstandingConfirms = new ConcurrentDictionary<ulong, string>();
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost"
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    

                    // -------------------------------------------------------------------
                    // Setup the Exchange and Queue then bind the queue to the exchange
                    // -------------------------------------------------------------------
                    
                    // This will create the exchange if needed
                    channel.ExchangeDeclare(exchange: "DirectExchange",
                        type: ExchangeType.Direct);

                    var queueName = "DirectExchangeQueue";
                    // Make sure to create the queue in case it doesn't exits
                    channel.QueueDeclare(queue: queueName,
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    channel.QueueBind(queue: queueName,
                        exchange: "DirectExchange",
                        routingKey: "");

                    // -------------------------------------------------------------------
                    // Setup Publish Confirms
                    // -------------------------------------------------------------------
                    channel.BasicAcks += (sender, ea) =>
                    {
                        Console.WriteLine("Message Acknowledged with Delivery Tag {0}", ea.DeliveryTag);
                        // message is confirmed
                        CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                    };

                    channel.BasicNacks += (sender, ea) =>
                    {
                        // message is nack-ed (messages that have been lost)
                        _outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
                        Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
                        CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
                    };

                    // We need to enable published confirms on the channel
                    channel.ConfirmSelect();

                    // -------------------------------------------------------------------
                    // Setup The message and add it to the ConcurrentDictionary, so we can 
                    // Remove it when the BasicAcks is called
                    // -------------------------------------------------------------------
                    var message = "YOUR MESSAGE HERE";
                    var body = Encoding.UTF8.GetBytes(message);

                    var nextPublishSequenceNo = channel.NextPublishSeqNo;
                    Console.WriteLine("Next Publish Sequenece Number: {0}", nextPublishSequenceNo);
                    _outstandingConfirms.TryAdd(nextPublishSequenceNo, message);

                    // Make sure the message is written to disk as soon as it reaches the queue
                    // Imagine this will be slower but safer, this is also stored in memory if there is no memory pressure!
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    channel.BasicPublish(exchange: "DirectExchange",
                        routingKey: "",
                        basicProperties: properties,
                        body: body);
                    channel.WaitForConfirmsOrDie();
                    Console.WriteLine("Sent message '{0}'", message);
                }
            }
        }

        static void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
        {
            if (multiple)
            {
                var confirmed = _outstandingConfirms.Where(k => k.Key <= sequenceNumber);
                foreach (var entry in confirmed)
                {
                    _outstandingConfirms.TryRemove(entry.Key, out _);
                }
            }
            else
            {
                _outstandingConfirms.TryRemove(sequenceNumber, out _);
            }
        }
    }
}

receiver/worker:

using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace PublishConfirms.Receive
{
    class Program
    {
        // We can run multiples of these and only one will get a messages from the queue with no sharing
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost"
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    // -------------------------------------------------------------------
                    // Setup the Exchange and Queue then bind the queue to the exchange
                    // -------------------------------------------------------------------
                    
                    // This will create the exchange if needed
                    channel.ExchangeDeclare(exchange: "DirectExchange",
                        type: ExchangeType.Direct);

                    var queueName = "DirectExchangeQueue";
                    // Make sure to create the queue in case it doesn't exits
                    channel.QueueDeclare(queue: queueName,
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    // Only dispatch one message at a time to a worked and wait for its acknowledgement
                    // Adding this in seems to correct the issue where only one worker would pick
                    // up the queued items
                    channel.BasicQos(0, 1, false);

                    // We keep the routing key the same as we don't want different handlers
                    // If we were to have different routes then we 
                    // would most probably have to create a random queue e.g.
                    // var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: queueName,
                        exchange: "DirectExchange",
                        routingKey: ""); 

                    Console.WriteLine("Waiting for messages...");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("Received message: {0}", message);
                        Console.WriteLine("Processing...");
                        Thread.Sleep(3000); // simulate some work
                        Console.WriteLine("Processing Complete");
                        // send an acknowledgement back
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };

                    channel.BasicConsume(queue: queueName,
                        autoAck: false,
                        consumer: consumer);

                    Console.WriteLine("Press any key to exit");
                    Console.ReadLine();
                }
            }
        }
    }
}