将观察员添加到已经 运行 个 MassTransit 系统
Adding Observers to already running MassTransit system
我正在尝试向包含 MassTransit 观察器的系统添加微服务,该观察器将观察请求响应或发布已在系统中使用的消息。我无法轻松地重新部署现有服务,所以如果可能的话我宁愿避免它。
以下代码只在服务启动时执行,发送消息时不执行。
BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri($"{settings.Protocol}://{settings.RabbitMqHost}/"), h =>
{
h.Username(settings.RabbitMqConsumerUser);
h.Password(settings.RabbitMqConsumerPassword);
});
cfg.ReceiveEndpoint(host, "pub_sub_flo", ec => { });
host.ConnectSendObserver(new RequestObserver());
host.ConnectPublishObserver(new RequestObserver());
});
观察者:
public class RequestObserver : ISendObserver, IPublishObserver
{
public Task PreSend<T>(SendContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostSend<T>(SendContext<T> context) where T : class
{
var proxy = new StoreProxyFactory().CreateProxy("fabric:/MessagePatterns");
proxy.AddEvent(new ConsumerEvent()
{
Id = Guid.NewGuid(),
ConsumerId = Guid.NewGuid(),
Message = "AMQPRequestResponse",
Date = DateTimeOffset.Now,
Type = "Observer"
}).Wait();
return Task.CompletedTask;
}
public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
var proxy = new StoreProxyFactory().CreateProxy("fabric:/MessagePatterns");
proxy.AddEvent(new ConsumerEvent()
{
Id = Guid.NewGuid(),
ConsumerId = Guid.NewGuid(),
Message = "AMQPRequestResponse",
Date = DateTimeOffset.Now,
Type = "Observer"
}).Wait();
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
有人能帮忙吗?
非常感谢。
观察者只会在它们所连接的总线实例上发送、发布等消息时被调用。他们不会观察其他总线实例发送或接收的消息。
如果您想观察这些消息,您可以创建一个观察者队列并将该队列绑定到您的服务交换,以便将请求消息的副本发送到您的服务。然而,回复并不容易获得,因为它们是通过临时交换直接发送到客户端队列的。
cfg.ReceiveEndpoint(host, "service-observer", e =>
{
e.Consumer<SomeConsumer>(...);
e.Bind("service-endpoint");
});
这会将服务端点交换绑定到您的接收端点队列,以便将消息的副本发送给您的消费者。
这通常被称为窃听器。
我正在尝试向包含 MassTransit 观察器的系统添加微服务,该观察器将观察请求响应或发布已在系统中使用的消息。我无法轻松地重新部署现有服务,所以如果可能的话我宁愿避免它。
以下代码只在服务启动时执行,发送消息时不执行。
BusControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri($"{settings.Protocol}://{settings.RabbitMqHost}/"), h =>
{
h.Username(settings.RabbitMqConsumerUser);
h.Password(settings.RabbitMqConsumerPassword);
});
cfg.ReceiveEndpoint(host, "pub_sub_flo", ec => { });
host.ConnectSendObserver(new RequestObserver());
host.ConnectPublishObserver(new RequestObserver());
});
观察者:
public class RequestObserver : ISendObserver, IPublishObserver
{
public Task PreSend<T>(SendContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostSend<T>(SendContext<T> context) where T : class
{
var proxy = new StoreProxyFactory().CreateProxy("fabric:/MessagePatterns");
proxy.AddEvent(new ConsumerEvent()
{
Id = Guid.NewGuid(),
ConsumerId = Guid.NewGuid(),
Message = "AMQPRequestResponse",
Date = DateTimeOffset.Now,
Type = "Observer"
}).Wait();
return Task.CompletedTask;
}
public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
var proxy = new StoreProxyFactory().CreateProxy("fabric:/MessagePatterns");
proxy.AddEvent(new ConsumerEvent()
{
Id = Guid.NewGuid(),
ConsumerId = Guid.NewGuid(),
Message = "AMQPRequestResponse",
Date = DateTimeOffset.Now,
Type = "Observer"
}).Wait();
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
有人能帮忙吗?
非常感谢。
观察者只会在它们所连接的总线实例上发送、发布等消息时被调用。他们不会观察其他总线实例发送或接收的消息。
如果您想观察这些消息,您可以创建一个观察者队列并将该队列绑定到您的服务交换,以便将请求消息的副本发送到您的服务。然而,回复并不容易获得,因为它们是通过临时交换直接发送到客户端队列的。
cfg.ReceiveEndpoint(host, "service-observer", e =>
{
e.Consumer<SomeConsumer>(...);
e.Bind("service-endpoint");
});
这会将服务端点交换绑定到您的接收端点队列,以便将消息的副本发送给您的消费者。
这通常被称为窃听器。