将 RabbitMQ 与 .NET Core WebAPI 结合使用时丢失消息
Losing messages when using RabbitMQ with .NET Core WebAPI
我正在尝试使用 RabbitMQ 从两个网络 API 接收和发送简单消息。现在这是一个非常简单的代码,我正在尝试查看 API 是否能够正确地相互通信。问题是我没有收到所有消息,也无法在我丢失的消息和收到的消息之间建立模式。
下面是示例代码。
用于发送消息
public class QueueController : Controller
{
[HttpGet]
[Route("send")]
public async Task<IActionResult> Send()
{
QueueManager.Send();
return Ok();
}
}
public class QueueManager
{
public static string queueName = "test-queue";
public static int count = 0;
public static void Send()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
Using (var channel = connection.CreateModel())
{
var queue = channel.QueueDeclare(queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
count++;
var message = new { Message = "Sent Message", count = count };
var body = JsonSerializer.Serialize(message);
var queueMessage = Encoding.UTF8.GetBytes(body);
channel.BasicPublish("", queueName, null, queueMessage);
}
}
}
用于接收消息
public class QueueController : Controller
{
[HttpGet]
[Route("receive")]
public async Task<IActionResult> Receive()
{
QueueManager.Receive();
return Ok();
}
}
public class QueueManager
{
public static string queueName = "test-queue";
public static void Receive()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var msg = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(msg);
Console.WriteLine(message);
};
channel.BasicConsume(queueName, true, consumer);
}
}
}
当我检查接收器的控制台时 API,消息计数是随机的。例如,当我发送 7 条消息时,我收到的消息是 2,3 和 7。所以我丢失了 7 条消息中的 4 条。不知道这里出了什么问题。
另外,当我检查管理控制台时,我可以看到只有当我在接收器 API 中调用端点时队列才被清空,但是该消息仍然没有出现在控制台中。任何帮助将不胜感激。
我创建了两个解决方案并在其中使用了您的代码,但它不起作用。我改成这样了
你的制作人class:
public class QueueManager2
{
public static string queueName = "test-queue";
public static int count = 0;
public static void Send()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
count++;
var message = new { Message = "Sent Message", count = count };
var body = JsonSerializer.Serialize(message);
var queueMessage = Encoding.UTF8.GetBytes(body);
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: queueMessage);
}
}
}
和您的消费者class:
public class QueueManager2
{
public static string queueName = "test-queue";
public static void Receive()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var rabbitMqConnection = factory.CreateConnection();
var rabbitMqChannel = rabbitMqConnection.CreateModel();
rabbitMqChannel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
rabbitMqChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
int messageCount = Convert.ToInt16(rabbitMqChannel.MessageCount(queueName));
var consumer = new EventingBasicConsumer(rabbitMqChannel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
rabbitMqChannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
rabbitMqChannel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
}
}
现在,如果您 运行 您的项目,您可以生成一条消息并使用它。实际上,对于消费者来说,最好有一个始终处于 运行ning 模式的托管服务。有一个端点来获取消息不是一个好主意
我正在尝试使用 RabbitMQ 从两个网络 API 接收和发送简单消息。现在这是一个非常简单的代码,我正在尝试查看 API 是否能够正确地相互通信。问题是我没有收到所有消息,也无法在我丢失的消息和收到的消息之间建立模式。 下面是示例代码。
用于发送消息
public class QueueController : Controller
{
[HttpGet]
[Route("send")]
public async Task<IActionResult> Send()
{
QueueManager.Send();
return Ok();
}
}
public class QueueManager
{
public static string queueName = "test-queue";
public static int count = 0;
public static void Send()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
Using (var channel = connection.CreateModel())
{
var queue = channel.QueueDeclare(queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
count++;
var message = new { Message = "Sent Message", count = count };
var body = JsonSerializer.Serialize(message);
var queueMessage = Encoding.UTF8.GetBytes(body);
channel.BasicPublish("", queueName, null, queueMessage);
}
}
}
用于接收消息
public class QueueController : Controller
{
[HttpGet]
[Route("receive")]
public async Task<IActionResult> Receive()
{
QueueManager.Receive();
return Ok();
}
}
public class QueueManager
{
public static string queueName = "test-queue";
public static void Receive()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var msg = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(msg);
Console.WriteLine(message);
};
channel.BasicConsume(queueName, true, consumer);
}
}
}
当我检查接收器的控制台时 API,消息计数是随机的。例如,当我发送 7 条消息时,我收到的消息是 2,3 和 7。所以我丢失了 7 条消息中的 4 条。不知道这里出了什么问题。 另外,当我检查管理控制台时,我可以看到只有当我在接收器 API 中调用端点时队列才被清空,但是该消息仍然没有出现在控制台中。任何帮助将不胜感激。
我创建了两个解决方案并在其中使用了您的代码,但它不起作用。我改成这样了
你的制作人class:
public class QueueManager2
{
public static string queueName = "test-queue";
public static int count = 0;
public static void Send()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
count++;
var message = new { Message = "Sent Message", count = count };
var body = JsonSerializer.Serialize(message);
var queueMessage = Encoding.UTF8.GetBytes(body);
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: queueMessage);
}
}
}
和您的消费者class:
public class QueueManager2
{
public static string queueName = "test-queue";
public static void Receive()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var rabbitMqConnection = factory.CreateConnection();
var rabbitMqChannel = rabbitMqConnection.CreateModel();
rabbitMqChannel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
rabbitMqChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
int messageCount = Convert.ToInt16(rabbitMqChannel.MessageCount(queueName));
var consumer = new EventingBasicConsumer(rabbitMqChannel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
rabbitMqChannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
rabbitMqChannel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
}
}
现在,如果您 运行 您的项目,您可以生成一条消息并使用它。实际上,对于消费者来说,最好有一个始终处于 运行ning 模式的托管服务。有一个端点来获取消息不是一个好主意