单元测试 MassTransit 时未调用消费者
Consumer not getting called when unit testing MassTransit
我正在尝试使用 MassTransit 编写单元测试。在网上查看时,我发现访问总线的最佳方式是使用 InMemoryTestHarness
创建它。我添加了我的消费者和一个 PublishObserver
以获得结果行为。
在下面的示例中,我向总线发送了一条 TestRequest
消息,然后我的消费者读取请求并将一条 TestResponse
消息放回总线。最后,观察者得到响应。
我不知道问题是否出在我缺少的某些配置上,或者是否有某些任务我没有等待,但请求消息甚至从未到达消费者。
我错过了什么?
测试
[TestMethod]
public void RequestResponseBusTest()
{
var harness = new InMemoryTestHarness();
var consumer = new TestConsumer();
harness.OnConfigureInMemoryBus += c =>
{
c.ReceiveEndpoint("testqueue", e =>
e.Consumer(() => consumer));
};
var observer = new TestPublishObserver();
harness.OnConnectObservers += c =>
{
c.ConnectPublishObserver(observer);
};
harness.Start().Wait();
var bus = harness.Bus;
bus.Publish(new TestRequest() { X = 99 }).Wait();
Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");
}
并支持类
[Serializable]
public class TestRequest
{
public int X { get; set; }
}
[Serializable]
public class TestResponse
{
public int Y { get; set; }
}
public class TestConsumer : IConsumer<TestRequest>
{
public List<TestRequest> ConsumedMessages { get; } = new List<TestRequest>();
public Task Consume(ConsumeContext<TestRequest> context)
{
ConsumedMessages.Add(context.Message);
context.Publish(new TestResponse() { Y = 123 }).Wait();
return Task.CompletedTask;
}
}
private class TestPublishObserver : IPublishObserver
{
public List<TestRequest> PublishedRequests { get; } = new List<TestRequest>();
public List<TestResponse> PublishedResponses { get; } = new List<TestResponse>();
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
var msg = context.Message;
if (msg is TestRequest)
PublishedRequests.Add((TestRequest)(object)msg);
if (msg is TestResponse)
PublishedResponses.Add((TestResponse)(object)msg);
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
你需要在bus.Publish(new TestRequest() { X = 99 }).Wait()之后添加Thread.Sleep(2000);
bus.Publish 不保证消息传递。当你调用 Wait() 方法时,你只是等待它被发送,而不是被处理
或者!!!
[TestMethod]
public void RequestResponseBusTest()
{
var harness = new InMemoryTestHarness();
var consumer = new TestConsumer();
harness.OnConfigureInMemoryBus += c =>
{
c.ReceiveEndpoint("testqueue", e =>
e.Consumer(() => consumer));
};
var observer = new TestPublishObserver();
harness.OnConnectObservers += c =>
{
c.ConnectPublishObserver(observer);
};
harness.Start().Wait();
var bus = harness.Bus;
bus.Publish(new TestRequest() { X = 99 }).Wait();
//add this line
var receivedMessage = harness.Consumed.Select<TestRequest>().FirstOrDefault();
Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");
}
我正在尝试使用 MassTransit 编写单元测试。在网上查看时,我发现访问总线的最佳方式是使用 InMemoryTestHarness
创建它。我添加了我的消费者和一个 PublishObserver
以获得结果行为。
在下面的示例中,我向总线发送了一条 TestRequest
消息,然后我的消费者读取请求并将一条 TestResponse
消息放回总线。最后,观察者得到响应。
我不知道问题是否出在我缺少的某些配置上,或者是否有某些任务我没有等待,但请求消息甚至从未到达消费者。
我错过了什么?
测试
[TestMethod]
public void RequestResponseBusTest()
{
var harness = new InMemoryTestHarness();
var consumer = new TestConsumer();
harness.OnConfigureInMemoryBus += c =>
{
c.ReceiveEndpoint("testqueue", e =>
e.Consumer(() => consumer));
};
var observer = new TestPublishObserver();
harness.OnConnectObservers += c =>
{
c.ConnectPublishObserver(observer);
};
harness.Start().Wait();
var bus = harness.Bus;
bus.Publish(new TestRequest() { X = 99 }).Wait();
Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");
}
并支持类
[Serializable]
public class TestRequest
{
public int X { get; set; }
}
[Serializable]
public class TestResponse
{
public int Y { get; set; }
}
public class TestConsumer : IConsumer<TestRequest>
{
public List<TestRequest> ConsumedMessages { get; } = new List<TestRequest>();
public Task Consume(ConsumeContext<TestRequest> context)
{
ConsumedMessages.Add(context.Message);
context.Publish(new TestResponse() { Y = 123 }).Wait();
return Task.CompletedTask;
}
}
private class TestPublishObserver : IPublishObserver
{
public List<TestRequest> PublishedRequests { get; } = new List<TestRequest>();
public List<TestResponse> PublishedResponses { get; } = new List<TestResponse>();
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
var msg = context.Message;
if (msg is TestRequest)
PublishedRequests.Add((TestRequest)(object)msg);
if (msg is TestResponse)
PublishedResponses.Add((TestResponse)(object)msg);
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
你需要在bus.Publish(new TestRequest() { X = 99 }).Wait()之后添加Thread.Sleep(2000); bus.Publish 不保证消息传递。当你调用 Wait() 方法时,你只是等待它被发送,而不是被处理
或者!!!
[TestMethod]
public void RequestResponseBusTest()
{
var harness = new InMemoryTestHarness();
var consumer = new TestConsumer();
harness.OnConfigureInMemoryBus += c =>
{
c.ReceiveEndpoint("testqueue", e =>
e.Consumer(() => consumer));
};
var observer = new TestPublishObserver();
harness.OnConnectObservers += c =>
{
c.ConnectPublishObserver(observer);
};
harness.Start().Wait();
var bus = harness.Bus;
bus.Publish(new TestRequest() { X = 99 }).Wait();
//add this line
var receivedMessage = harness.Consumed.Select<TestRequest>().FirstOrDefault();
Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");
}