MassTransit 3 如何将消息显式发送到错误队列
MassTransit 3 How to send a message explicitly to the error queue
我正在使用 MassTransit 和 Reactive Extensions 来分批传输来自队列的消息。由于行为与普通消费者不同,如果失败 x 次,我需要能够将消息发送到错误队列。
我查看了 MassTransit 源代码并在 google 组中发帖,但找不到答案。
这在 ConsumeContext 界面上可用吗?或者这甚至可能吗?
这是我的代码。我删除了一些内容以使其更简单。
_busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.UseInMemoryScheduler();
cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
{
var _observer = new ObservableObserver<ConsumeContext<Customer>>();
_observer.Buffer(TimeSpan.FromMilliseconds(1000)).Subscribe(OnNext);
e.Observer(_observer);
});
});
private void OnNext(IList<ConsumeContext<Customer>> messages)
{
foreach (var consumeContext in messages)
{
Console.WriteLine("Content: " + consumeContext.Message.Content);
if (consumeContext.Message.RetryCount > 3)
{
// I want to be able to send to the error queue
consumeContext.SendToErrorQueue()
}
}
}
我找到了将 RabbitMQ 客户端与 MassTransit 结合使用的变通方法。由于我在使用 Observable 时无法抛出异常,因此不会创建错误队列。我使用 RabbitMQ 客户端手动创建它,如下所示。
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "guest";
factory.Password = "guest";
using (IConnection connection = factory.CreateConnection())
{
using (IModel model = connection.CreateModel())
{
string exchangeName = "customer_update_queue_error";
string queueName = "customer_update_queue_error";
string routingKey = "";
model.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
model.QueueDeclare(queueName, false, false, false, null);
model.QueueBind(queueName, exchangeName, routingKey);
}
}
发送部分就是像这样失败x次就直接发送到消息队列。
consumeContext.Send(new Uri("rabbitmq://localhost/customer_update_queue_error"), consumeContext.Message);
希望批处理功能能尽快实现,我可以用它来代替。
https://github.com/MassTransit/MassTransit/issues/800
我正在使用 MassTransit 和 Reactive Extensions 来分批传输来自队列的消息。由于行为与普通消费者不同,如果失败 x 次,我需要能够将消息发送到错误队列。
我查看了 MassTransit 源代码并在 google 组中发帖,但找不到答案。
这在 ConsumeContext 界面上可用吗?或者这甚至可能吗?
这是我的代码。我删除了一些内容以使其更简单。
_busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.UseInMemoryScheduler();
cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
{
var _observer = new ObservableObserver<ConsumeContext<Customer>>();
_observer.Buffer(TimeSpan.FromMilliseconds(1000)).Subscribe(OnNext);
e.Observer(_observer);
});
});
private void OnNext(IList<ConsumeContext<Customer>> messages)
{
foreach (var consumeContext in messages)
{
Console.WriteLine("Content: " + consumeContext.Message.Content);
if (consumeContext.Message.RetryCount > 3)
{
// I want to be able to send to the error queue
consumeContext.SendToErrorQueue()
}
}
}
我找到了将 RabbitMQ 客户端与 MassTransit 结合使用的变通方法。由于我在使用 Observable 时无法抛出异常,因此不会创建错误队列。我使用 RabbitMQ 客户端手动创建它,如下所示。
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "guest";
factory.Password = "guest";
using (IConnection connection = factory.CreateConnection())
{
using (IModel model = connection.CreateModel())
{
string exchangeName = "customer_update_queue_error";
string queueName = "customer_update_queue_error";
string routingKey = "";
model.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
model.QueueDeclare(queueName, false, false, false, null);
model.QueueBind(queueName, exchangeName, routingKey);
}
}
发送部分就是像这样失败x次就直接发送到消息队列。
consumeContext.Send(new Uri("rabbitmq://localhost/customer_update_queue_error"), consumeContext.Message);
希望批处理功能能尽快实现,我可以用它来代替。 https://github.com/MassTransit/MassTransit/issues/800