将观察员添加到已经 运行 个 MassTransit 系统

Adding Observers to already running MassTransit system

我正在尝试向包含 MassTransit 观察器的系统添加微服务,该观察器将观察请求响应或发布已在系统中使用的消息。我无法轻松地重新部署现有服务,所以如果可能的话我宁愿避免它。

以下代码只在服务启动时执行,发送消息时不执行。

                BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
                {
                    var host = cfg.Host(new Uri($"{settings.Protocol}://{settings.RabbitMqHost}/"), h =>
                    {
                        h.Username(settings.RabbitMqConsumerUser);
                        h.Password(settings.RabbitMqConsumerPassword);
                    });

                    cfg.ReceiveEndpoint(host, "pub_sub_flo", ec => { });

                    host.ConnectSendObserver(new RequestObserver());
                    host.ConnectPublishObserver(new RequestObserver());

                });

观察者:

 public class RequestObserver : ISendObserver, IPublishObserver
    {
        public Task PreSend<T>(SendContext<T> context) where T : class
        {
            return Task.CompletedTask;
        }

        public Task PostSend<T>(SendContext<T> context) where T : class
        {
            var proxy = new StoreProxyFactory().CreateProxy("fabric:/MessagePatterns");

            proxy.AddEvent(new ConsumerEvent()
            {
                Id = Guid.NewGuid(),
                ConsumerId = Guid.NewGuid(),
                Message = "AMQPRequestResponse",
                Date = DateTimeOffset.Now,
                Type = "Observer"
            }).Wait();

            return Task.CompletedTask;
        }

        public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
        {
            return Task.CompletedTask;
        }

        public Task PrePublish<T>(PublishContext<T> context) where T : class
        {
            return Task.CompletedTask;
        }

        public Task PostPublish<T>(PublishContext<T> context) where T : class
        {
            var proxy = new StoreProxyFactory().CreateProxy("fabric:/MessagePatterns");

            proxy.AddEvent(new ConsumerEvent()
            {
                Id = Guid.NewGuid(),
                ConsumerId = Guid.NewGuid(),
                Message = "AMQPRequestResponse",
                Date = DateTimeOffset.Now,
                Type = "Observer"
            }).Wait();

            return Task.CompletedTask;
        }

        public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
        {
            return Task.CompletedTask;
        }
    }

有人能帮忙吗?

非常感谢。

观察者只会在它们所连接的总线实例上发送、发布等消息时被调用。他们不会观察其他总线实例发送或接收的消息。

如果您想观察这些消息,您可以创建一个观察者队列并将该队列绑定到您的服务交换,以便将请求消息的副本发送到您的服务。然而,回复并不容易获得,因为它们是通过临时交换直接发送到客户端队列的。

cfg.ReceiveEndpoint(host, "service-observer", e =>
{
    e.Consumer<SomeConsumer>(...);
    e.Bind("service-endpoint");
});

这会将服务端点交换绑定到您的接收端点队列,以便将消息的副本发送给您的消费者。

这通常被称为窃听器。