试图从 MassTransit 消费者那里获取消息
Trying to get the message from the MassTransit consumer
我正在使用 Masstransit 和 RabbitMQ 发布事件(没有消费者,只使用发布者),目前我正在尝试创建一个集成测试来验证消息是否已发布,如果所以,我想检查它是否是正确的信息。为此,我创建了一个消费者来消费队列中的消息并将其与我的预期进行比较。这里的问题是,我无法使用消息。活动已成功发布,但我无法收到消息。
这是负责连接消费者
的class
public class ServiceBusHelper
{
private IBusControl bus;
private readonly string serviceBusQueueName = ConfigurationManager.AppSettings["ServiceBusQueuename"];
private readonly string serviceBusEndpoint = ConfigurationManager.AppSettings["ServiceBusEndPoint"];
private readonly string serviceBusUsername = ConfigurationManager.AppSettings["ServiceBusUsername"];
private readonly string serviceBusPassword = ConfigurationManager.AppSettings["ServiceBusPassword"];
public ConnectHandle HandleObserver { get; set; }
public void ConnectRabbitMQ()
{
bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(serviceBusEndpoint),
h =>
{
h.Username(serviceBusUsername);
h.Password(serviceBusPassword);
});
cfg.ReceiveEndpoint(
serviceBusQueueName,
e =>
{
e.Consumer<ServiceBusEventsHelper>();
});
});
//Observer
var observer = new PublishedObserverHelper();
HandleObserver = bus.ConnectPublishObserver(observer);
}
}
这是将使用消息
的 class
public class ServiceBusEventsHelper : IConsumer<ITransportCreatedEvent>
{
public ITransportCreatedEvent Result { get; set; }
public async Task Consume(ConsumeContext<ITransportCreatedEvent> context)
{
Result = await Task.FromResult(context.Message);
}
}
在测试方法中我有这个
ServiceBusEventsHelper eventHelper = new ServiceBusEventsHelper();
ServiceBusHelper busHelper = new ServiceBusHelper();
try
{
busHelper.ConnectRabbitMQ();
transportResponseDto = await this.transportClient.CreateTransportAsync(transportRequest);
handle = busHelper.HandleObserver;
var eventResponse = eventHelper.Result;// Allways NULL
}
catch (Exception ex)
{
Assert.Fail(ex.GetDetailMessage());
}
我正在尝试获取这样的消息结果
var eventResponse = eventHelper.Result;// Allways NULL
但始终为空。
有人可以帮我吗??
我有一项服务,其中一种方法是 CreateTransportAsync(),在该方法中我调用了 Publish
public async Task<TransportResponseDto> CreateTransportAsync(TransportDto request){
.
.
.
await this.RaiseTransportCreatedEvent(transportResponseDto);
}
private async Task RaiseTransportCreatedEvent(TransportResponseDto transportResponseDto)
{
var evt = CreateTransportEvent(transportResponseDto);
await this.serviceBus.Publish(evt).ConfigureAwait(false);
}
public class ServiceBus<T> : IServiceBus<T> where T : class
{
private readonly IBus bus;
public ServiceBus(IBus bus)
{
this.bus = bus;
}
public Task Publish(T evt)
{
return bus.Publish(evt, evt.GetType());
}
}
这就是我发布活动的方式,而且很有效。现在我正在尝试测试所有这些是否与另一个解决方案中的集成测试一起工作,我正在尝试创建一个消费者来使用队列中的消息。然后,我想验证该消息(将其与 json 文件中的假消息进行比较)以查看是否一切正常。问题是我无法收到该消息,也无法理解发生了什么。
P.S : 我不太明白你在第 3 点想说的是什么。
谢谢
- 您需要通过调用
bus.Start()
来启动总线。你不这样做,所以无论如何都不会收到任何东西。
- 不清楚
transportClient.CreateTransportAsync
的作用。谁在发布消息?
- 消费者根据消费的消息进行实例化。您在 "test" 中所做的 - 您实例化了消费者的一个实例并保留对该实例的引用。然后你在某处发送消息。 MassTransit 为您的消费者创建一个 新实例 ,您更新该字段,然后处理该实例。但是您正在检查您最初创建的实例的
Result
,它从未收到任何消息。它将永远是 null
.
- 将消息从发布者传递到消费者需要时间。您正在尝试在初始化总线后立即检查结果。我很确定你永远不会这么快得到它,即使你修复了 (1)、(2) 和 (3)
我不确定你到底想测试什么。发布和使用消息适用于所有使用 MassTransit 的传输。您可以从 Github 获取任何示例,构建它,运行 它并查看它的工作。
还有许多针对 RabbitMQ 传输的测试展示了如何创建类似的东西。例如,检查 ConsumerBind_Specs.cs 文件。
此外,如果您想使用一些现有的消费者实例,您可以按照文档 Connecting an existing consumer instance 中的描述将此实例连接到总线。使用 e.Instance
而不是 e.Consumer
将使您的测试工作正确等待 Consume
方法完成。但是,这并不是真正流行的方法,因为您确实希望将您的使用者范围限制为仅处理一条消息。
我正在使用 Masstransit 和 RabbitMQ 发布事件(没有消费者,只使用发布者),目前我正在尝试创建一个集成测试来验证消息是否已发布,如果所以,我想检查它是否是正确的信息。为此,我创建了一个消费者来消费队列中的消息并将其与我的预期进行比较。这里的问题是,我无法使用消息。活动已成功发布,但我无法收到消息。
这是负责连接消费者
的classpublic class ServiceBusHelper
{
private IBusControl bus;
private readonly string serviceBusQueueName = ConfigurationManager.AppSettings["ServiceBusQueuename"];
private readonly string serviceBusEndpoint = ConfigurationManager.AppSettings["ServiceBusEndPoint"];
private readonly string serviceBusUsername = ConfigurationManager.AppSettings["ServiceBusUsername"];
private readonly string serviceBusPassword = ConfigurationManager.AppSettings["ServiceBusPassword"];
public ConnectHandle HandleObserver { get; set; }
public void ConnectRabbitMQ()
{
bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(serviceBusEndpoint),
h =>
{
h.Username(serviceBusUsername);
h.Password(serviceBusPassword);
});
cfg.ReceiveEndpoint(
serviceBusQueueName,
e =>
{
e.Consumer<ServiceBusEventsHelper>();
});
});
//Observer
var observer = new PublishedObserverHelper();
HandleObserver = bus.ConnectPublishObserver(observer);
}
}
这是将使用消息
的 classpublic class ServiceBusEventsHelper : IConsumer<ITransportCreatedEvent>
{
public ITransportCreatedEvent Result { get; set; }
public async Task Consume(ConsumeContext<ITransportCreatedEvent> context)
{
Result = await Task.FromResult(context.Message);
}
}
在测试方法中我有这个
ServiceBusEventsHelper eventHelper = new ServiceBusEventsHelper();
ServiceBusHelper busHelper = new ServiceBusHelper();
try
{
busHelper.ConnectRabbitMQ();
transportResponseDto = await this.transportClient.CreateTransportAsync(transportRequest);
handle = busHelper.HandleObserver;
var eventResponse = eventHelper.Result;// Allways NULL
}
catch (Exception ex)
{
Assert.Fail(ex.GetDetailMessage());
}
我正在尝试获取这样的消息结果
var eventResponse = eventHelper.Result;// Allways NULL
但始终为空。
有人可以帮我吗??
我有一项服务,其中一种方法是 CreateTransportAsync(),在该方法中我调用了 Publish
public async Task<TransportResponseDto> CreateTransportAsync(TransportDto request){
.
.
.
await this.RaiseTransportCreatedEvent(transportResponseDto);
}
private async Task RaiseTransportCreatedEvent(TransportResponseDto transportResponseDto)
{
var evt = CreateTransportEvent(transportResponseDto);
await this.serviceBus.Publish(evt).ConfigureAwait(false);
}
public class ServiceBus<T> : IServiceBus<T> where T : class
{
private readonly IBus bus;
public ServiceBus(IBus bus)
{
this.bus = bus;
}
public Task Publish(T evt)
{
return bus.Publish(evt, evt.GetType());
}
}
这就是我发布活动的方式,而且很有效。现在我正在尝试测试所有这些是否与另一个解决方案中的集成测试一起工作,我正在尝试创建一个消费者来使用队列中的消息。然后,我想验证该消息(将其与 json 文件中的假消息进行比较)以查看是否一切正常。问题是我无法收到该消息,也无法理解发生了什么。 P.S : 我不太明白你在第 3 点想说的是什么。 谢谢
- 您需要通过调用
bus.Start()
来启动总线。你不这样做,所以无论如何都不会收到任何东西。 - 不清楚
transportClient.CreateTransportAsync
的作用。谁在发布消息? - 消费者根据消费的消息进行实例化。您在 "test" 中所做的 - 您实例化了消费者的一个实例并保留对该实例的引用。然后你在某处发送消息。 MassTransit 为您的消费者创建一个 新实例 ,您更新该字段,然后处理该实例。但是您正在检查您最初创建的实例的
Result
,它从未收到任何消息。它将永远是null
. - 将消息从发布者传递到消费者需要时间。您正在尝试在初始化总线后立即检查结果。我很确定你永远不会这么快得到它,即使你修复了 (1)、(2) 和 (3)
我不确定你到底想测试什么。发布和使用消息适用于所有使用 MassTransit 的传输。您可以从 Github 获取任何示例,构建它,运行 它并查看它的工作。
还有许多针对 RabbitMQ 传输的测试展示了如何创建类似的东西。例如,检查 ConsumerBind_Specs.cs 文件。
此外,如果您想使用一些现有的消费者实例,您可以按照文档 Connecting an existing consumer instance 中的描述将此实例连接到总线。使用 e.Instance
而不是 e.Consumer
将使您的测试工作正确等待 Consume
方法完成。但是,这并不是真正流行的方法,因为您确实希望将您的使用者范围限制为仅处理一条消息。