将 RabbitMQ 与 .NET Core WebAPI 结合使用时丢失消息

Losing messages when using RabbitMQ with .NET Core WebAPI

我正在尝试使用 RabbitMQ 从两个网络 API 接收和发送简单消息。现在这是一个非常简单的代码,我正在尝试查看 API 是否能够正确地相互通信。问题是我没有收到所有消息,也无法在我丢失的消息和收到的消息之间建立模式。 下面是示例代码。

用于发送消息

public class QueueController : Controller
    {
        [HttpGet]
        [Route("send")]
        public async Task<IActionResult> Send()
        {
            QueueManager.Send();
            return Ok();
        }
   }

public class QueueManager
    {
        public static string queueName = "test-queue";
        public static int count = 0;
        public static void Send()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            Using (var channel = connection.CreateModel())
            {
            var queue = channel.QueueDeclare(queueName,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
            count++;
            var message = new { Message = "Sent Message", count = count };
            var body = JsonSerializer.Serialize(message);
            var queueMessage = Encoding.UTF8.GetBytes(body);
            channel.BasicPublish("", queueName, null, queueMessage);
            }
        }
    }

用于接收消息

public class QueueController : Controller
    {
        [HttpGet]
        [Route("receive")]
        public async Task<IActionResult> Receive()
        {
            QueueManager.Receive();
            return Ok();
        }
    }

public class QueueManager
    {
        public static string queueName = "test-queue";
        public static void Receive()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queueName,
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var msg = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(msg);
                    Console.WriteLine(message);
                };
                channel.BasicConsume(queueName, true, consumer);
            }
        }
    }

当我检查接收器的控制台时 API,消息计数是随机的。例如,当我发送 7 条消息时,我收到的消息是 2,3 和 7。所以我丢失了 7 条消息中的 4 条。不知道这里出了什么问题。 另外,当我检查管理控制台时,我可以看到只有当我在接收器 API 中调用端点时队列才被清空,但是该消息仍然没有出现在控制台中。任何帮助将不胜感激。

我创建了两个解决方案并在其中使用了您的代码,但它不起作用。我改成这样了

你的制作人class:

public class QueueManager2
{
    public static string queueName = "test-queue";
    public static int count = 0;
    public static void Send()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: queueName,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            count++;
            var message = new { Message = "Sent Message", count = count };

            var body = JsonSerializer.Serialize(message);
            var queueMessage = Encoding.UTF8.GetBytes(body);
             
            channel.BasicPublish(exchange: "",
                                 routingKey: queueName,
                                 basicProperties: null,
                                 body: queueMessage);
        }
    }
}

和您的消费者class:

public class QueueManager2
{
    public static string queueName = "test-queue";
    public static void Receive()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
         var rabbitMqConnection = factory.CreateConnection();
        var rabbitMqChannel = rabbitMqConnection.CreateModel();

        rabbitMqChannel.QueueDeclare(queue: queueName,
                             durable: false,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        rabbitMqChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        int messageCount = Convert.ToInt16(rabbitMqChannel.MessageCount(queueName));            

        var consumer = new EventingBasicConsumer(rabbitMqChannel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());                
            rabbitMqChannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            
        };
        rabbitMqChannel.BasicConsume(queue: queueName,
                             autoAck: false,
                             consumer: consumer); 
    }
} 

现在,如果您 运行 您的项目,您可以生成一条消息并使用它。实际上,对于消费者来说,最好有一个始终处于 运行ning 模式的托管服务。有一个端点来获取消息不是一个好主意