RabbitMQ - 了解消费者已启动

RabbitMQ - Knowing Consumer is up

我有几个关于在 RabbitMQ 中编码的问题...我对这个世界并不陌生,并且根据提供给我的设计提出了一些问题...

设计表明发布者需要等待并知道消费者的处理何时完成才能在客户端执行某些任务(取决于成功/失败)。

我已经尝试了下面的代码,但是在我没有发送任何 Ack 或 Nack 的情况下,dequeue 立即从队列中删除了消息。我很困惑

出版商代码:

using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("test", durable, false, false, null);
                    channel.TxSelect();
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);

                    string message = "Hello World!";
                    var body = Encoding.UTF8.GetBytes(message);



                    channel.BasicPublish("", "test", properties, body);
                    channel.TxCommit();
                    Console.WriteLine(" [x] Sent {0}", message);
                }
            }

消费者代码

 using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("test", durable, false, false, null);

                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume("test", true, consumer);

                Console.WriteLine(" [*] Waiting for messages." +
                                         "To exit press CTRL+C");
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);
                }
            }
        }

注意:我意识到 channel.BasicConsume("test", true, consumer); 的 noAck 为真。我将其更改为 channel.BasicConsume("test", false, consumer);

当我使用时,我可以看到消息已从队列中删除 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 但是,Publisher 如何知道 Consumer 已成功处理它?

这是一个重复的问题。检查此线程,这是一个具有正确答案的类似问题:Why doesn't Channel.waitForConfirmsOrDie block?

How do I ensure that Publisher sends message to Server only when Consumer is ready for processing?

你不能。更重要的是,你不应该这样做。使用消息传递架构的目的是忘记这类问题。检查 this.

此外,RabbitMQ 会为您存储这些消息,直到有人准备好处理它们(如果队列是持久的)。

But, how does Publisher know that the Consumer processed it successfully?

不,不会。 ack 仅在 RabbitMQ 和您的消费者之间或 RabbitMQ 和您的生产者之间。查看 this 了解有关 ack/nack 的一些详细信息。

如果我能正确理解你的话,你想在这里实现的是一种 "chatty" 架构,其中消费者也是发布者 "response messages" 的生产者,消息如 "hey, I'm done with message XX, everything is ok".

最简单的方法是将您的消费者设置为生产者,并将您的生产者设置为消费者。您只需向消息添加一个 guid 或某种唯一 ID,完成后,您将在另一个队列上发送一条消息,该消息作为内容通知原始发布者(消费者)这个 "response" 队列)工作已成功完成。

希望对您有所帮助:)

或者您可以使用 RPC 模式

https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html