request/response 未调用 MassTransit 故障消费者
MassTransit fault consumer not invoked for request/response
关于 Request/Response 模式,在 MassTransit 3+ 中处理异常的最佳做法是什么? docs here 提到如果消息上存在 ResponseAddress,则 Fault 消息将发送到该地址,但是如何 consumer/receive 该地址的消息? Bus.Request 的 ResponseAddress 似乎是我无法控制的自动生成的 MassTransit 地址,因此我不知道如何访问主要消费者抛出的异常。我错过了什么?这是我使用 Unity 容器注册消费者及其故障消费者的代码:
cfg.ReceiveEndpoint(host, "request_response_queue", e =>
{
e.Consumer<IConsumer<IRequestResponse>>(container);
e.Consumer(() => container.Resolve<IMessageFaultConsumer<IRequestResponse>>() as IConsumer<Fault<IRequestResponse>>);
});
这是我对全局消息错误消费者的尝试:
public interface IMessageFaultConsumer<TMessage>
{
}
public class MessageFaultConsumer<TMessage> : IConsumer<Fault<TMessage>>, IMessageFaultConsumer<TMessage>
{
public Task Consume(ConsumeContext<Fault<TMessage>> context)
{
Console.WriteLine("MessageFaultConsumer");
return Task.FromResult(0);
}
}
当我使用 Bus.Publish 而不是 Bus.Request 时,这种方法确实有效。我还研究了创建一个 IConsumeObserver 并将我的全局异常记录代码放入 ConsumeFault 方法中,但这有一个缺点,即在重试放弃之前调用每个异常。处理 request/response 异常的正确方法是什么?
首先,MassTransit 中的 request/response 支持旨在与 .Request()
方法或请求客户端(MessageRequestClient
或 PublishRequestClient
)一起使用。使用这些方法,如果请求消息的消费者抛出异常,该异常将被打包到 Fault<T>
中,然后发送到 ResponseAddress
。由于 .Request()
方法和请求客户端都是异步的,因此使用 await 将抛出异常,并包含来自故障的异常数据。这就是它的设计方式,等待请求,它要么完成,要么超时,要么出错(等待时抛出异常)。
如果您尝试放入一些全局 "exception handler" 代码用于日志记录,您确实应该在服务边界记录这些代码,而观察者是处理它的最佳方式。这样,您只需实施 ConsumeFault
方法,并记录到您的事件接收器。但是,这在消费者管道中是同步的,因此请注意可能引入的延迟。
另一种选择当然是只使用 Fault<T>
,但正如您所提到的,当请求客户端与 header 中的响应地址一起使用时,它不会被发布。在这种情况下,也许您的请求者应该发布一个事件,指示操作 X 出错,并且您可以记录该事件——在业务上下文级别与服务级别。
这里有很多选项,只是选择最适合您的用例。
关于 Request/Response 模式,在 MassTransit 3+ 中处理异常的最佳做法是什么? docs here 提到如果消息上存在 ResponseAddress,则 Fault 消息将发送到该地址,但是如何 consumer/receive 该地址的消息? Bus.Request 的 ResponseAddress 似乎是我无法控制的自动生成的 MassTransit 地址,因此我不知道如何访问主要消费者抛出的异常。我错过了什么?这是我使用 Unity 容器注册消费者及其故障消费者的代码:
cfg.ReceiveEndpoint(host, "request_response_queue", e =>
{
e.Consumer<IConsumer<IRequestResponse>>(container);
e.Consumer(() => container.Resolve<IMessageFaultConsumer<IRequestResponse>>() as IConsumer<Fault<IRequestResponse>>);
});
这是我对全局消息错误消费者的尝试:
public interface IMessageFaultConsumer<TMessage>
{
}
public class MessageFaultConsumer<TMessage> : IConsumer<Fault<TMessage>>, IMessageFaultConsumer<TMessage>
{
public Task Consume(ConsumeContext<Fault<TMessage>> context)
{
Console.WriteLine("MessageFaultConsumer");
return Task.FromResult(0);
}
}
当我使用 Bus.Publish 而不是 Bus.Request 时,这种方法确实有效。我还研究了创建一个 IConsumeObserver 并将我的全局异常记录代码放入 ConsumeFault 方法中,但这有一个缺点,即在重试放弃之前调用每个异常。处理 request/response 异常的正确方法是什么?
首先,MassTransit 中的 request/response 支持旨在与 .Request()
方法或请求客户端(MessageRequestClient
或 PublishRequestClient
)一起使用。使用这些方法,如果请求消息的消费者抛出异常,该异常将被打包到 Fault<T>
中,然后发送到 ResponseAddress
。由于 .Request()
方法和请求客户端都是异步的,因此使用 await 将抛出异常,并包含来自故障的异常数据。这就是它的设计方式,等待请求,它要么完成,要么超时,要么出错(等待时抛出异常)。
如果您尝试放入一些全局 "exception handler" 代码用于日志记录,您确实应该在服务边界记录这些代码,而观察者是处理它的最佳方式。这样,您只需实施 ConsumeFault
方法,并记录到您的事件接收器。但是,这在消费者管道中是同步的,因此请注意可能引入的延迟。
另一种选择当然是只使用 Fault<T>
,但正如您所提到的,当请求客户端与 header 中的响应地址一起使用时,它不会被发布。在这种情况下,也许您的请求者应该发布一个事件,指示操作 X 出错,并且您可以记录该事件——在业务上下文级别与服务级别。
这里有很多选项,只是选择最适合您的用例。