request/response 未调用 MassTransit 故障消费者

MassTransit fault consumer not invoked for request/response

关于 Request/Response 模式,在 MassTransit 3+ 中处理异常的最佳做法是什么? docs here 提到如果消息上存在 ResponseAddress,则 Fault 消息将发送到该地址,但是如何 consumer/receive 该地址的消息? Bus.Request 的 ResponseAddress 似乎是我无法控制的自动生成的 MassTransit 地址,因此我不知道如何访问主要消费者抛出的异常。我错过了什么?这是我使用 Unity 容器注册消费者及其故障消费者的代码:

cfg.ReceiveEndpoint(host, "request_response_queue", e =>
{
    e.Consumer<IConsumer<IRequestResponse>>(container);
    e.Consumer(() => container.Resolve<IMessageFaultConsumer<IRequestResponse>>() as IConsumer<Fault<IRequestResponse>>);
});

这是我对全局消息错误消费者的尝试:

public interface IMessageFaultConsumer<TMessage>
{
}

public class MessageFaultConsumer<TMessage> : IConsumer<Fault<TMessage>>, IMessageFaultConsumer<TMessage>
{
    public Task Consume(ConsumeContext<Fault<TMessage>> context)
    {
        Console.WriteLine("MessageFaultConsumer");
        return Task.FromResult(0);
    }
}

当我使用 Bus.Publish 而不是 Bus.Request 时,这种方法确实有效。我还研究了创建一个 IConsumeObserver 并将我的全局异常记录代码放入 ConsumeFault 方法中,但这有一个缺点,即在重试放弃之前调用每个异常。处理 request/response 异常的正确方法是什么?

首先,MassTransit 中的 request/response 支持旨在与 .Request() 方法或请求客户端(MessageRequestClientPublishRequestClient)一起使用。使用这些方法,如果请求消息的消费者抛出异常,该异常将被打包到 Fault<T> 中,然后发送到 ResponseAddress。由于 .Request() 方法和请求客户端都是异步的,因此使用 await 将抛出异常,并包含来自故障的异常数据。这就是它的设计方式,等待请求,它要么完成,要么超时,要么出错(等待时抛出异常)。

如果您尝试放入一些全局 "exception handler" 代码用于日志记录,您确实应该在服务边界记录这些代码,而观察者是处理它的最佳方式。这样,您只需实施 ConsumeFault 方法,并记录到您的事件接收器。但是,这在消费者管道中是同步的,因此请注意可能引入的延迟。

另一种选择当然是只使用 Fault<T>,但正如您所提到的,当请求客户端与 header 中的响应地址一起使用时,它不会被发布。在这种情况下,也许您的请求者应该发布一个事件,指示操作 X 出错,并且您可以记录该事件——在业务上下文级别与服务级别。

这里有很多选项,只是选择最适合您的用例。