MassTransit 3 如何将消息显式发送到错误队列

MassTransit 3 How to send a message explicitly to the error queue

我正在使用 MassTransit 和 Reactive Extensions 来分批传输来自队列的消息。由于行为与普通消费者不同,如果失败 x 次,我需要能够将消息发送到错误队列。

我查看了 MassTransit 源代码并在 google 组中发帖,但找不到答案。

这在 ConsumeContext 界面上可用吗?或者这甚至可能吗?

这是我的代码。我删除了一些内容以使其更简单。

_busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
    {
        h.Username("guest");
        h.Password("guest");

    });

    cfg.UseInMemoryScheduler();

    cfg.ReceiveEndpoint(host, "customer_update_queue", e =>
    {
       var _observer = new ObservableObserver<ConsumeContext<Customer>>();

       _observer.Buffer(TimeSpan.FromMilliseconds(1000)).Subscribe(OnNext);

       e.Observer(_observer);       
   });
});


 private void OnNext(IList<ConsumeContext<Customer>> messages)
 {
    foreach (var consumeContext in messages)
    {
        Console.WriteLine("Content: " + consumeContext.Message.Content);

        if (consumeContext.Message.RetryCount > 3)
        {
            // I want to be able to send to the error queue     
            consumeContext.SendToErrorQueue()
        }
    }            
 }

我找到了将 RabbitMQ 客户端与 MassTransit 结合使用的变通方法。由于我在使用 Observable 时无法抛出异常,因此不会创建错误队列。我使用 RabbitMQ 客户端手动创建它,如下所示。

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.UserName = "guest";
factory.Password = "guest";

using (IConnection connection = factory.CreateConnection())
{
    using (IModel model = connection.CreateModel())
    {
        string exchangeName = "customer_update_queue_error";
        string queueName = "customer_update_queue_error";
        string routingKey = "";

        model.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
        model.QueueDeclare(queueName, false, false, false, null);
        model.QueueBind(queueName, exchangeName, routingKey);
    }
}

发送部分就是像这样失败x次就直接发送到消息队列。

consumeContext.Send(new Uri("rabbitmq://localhost/customer_update_queue_error"), consumeContext.Message);

希望批处理功能能尽快实现,我可以用它来代替。 https://github.com/MassTransit/MassTransit/issues/800