如何最好地等待 RabbitMQ 事件

How best to wait for a RabbitMQ event

我正在为我的基于微服务的应用程序编写集成事件,其中服务通过 RabbitMQ 等消息总线进行解耦和交互。对于集成测试,我使用实际组件而不是模拟任何组件,因此我使用的是实际的 RabbitMQ 服务。

我的一个服务订阅了一个事件,当事件被触发时,它会做一些处理,并通知处理成功。因此,为了测试它,集成测试应该发布服务监听的事件,并等待服务 "success" 事件。 (它在实际应用中更有意义,因此为了清楚起见,我将其简化了一点,因此可能听起来有点笨拙的设计。)

简而言之,测试实现为:

public class ProcessCompletedIntegrationEventHandler: 
    IIntegrationEventHandler<ProcessCompletedIntegrationEvent>
{
    public Task Handle(ProcessCompletedIntegrationEvent @event)
    {
        // When I put a break point here, I can see this method is called 
        // after the service under the test has completed its task and has 
        // published the message.
    }
}

public class IntegrationTests
{
    [Fact]
    public async Task IfReceivesAndPublishesEvents()
    {
        // Arrange
        _rabbitMQBus.Subscribe<
              ProcessCompletedIntegrationEvent,
              ProcessCompletedIntegrationEventHandler>();

        // Act
        Eventbus.Publish(new ProcessIntegrationEvent(name: "test"));

        // Assert
        // how should I assert if `ProcessCompletedIntegrationEventHandler` is called?
    }
}

问题:ProcessCompletedIntegrationEventHandlerHandle 方法应该添加什么以及如何在测试中检查它。

(出于某些原因,我对基于查询 RabbitMQ 并检查相关消息是否存在的断言不感兴趣。)

在这些情况下,您可以通过多种方式进行集成测试。在这样做之前,您应该澄清您到底在测试什么?

状态有变化?

您是否在处理事件后测试某些数据库(或存储)中的状态更改?如果是,那么您可以只从数据库中获取该状态并断言它是否已更改为您期望的状态。在这种情况下,您可以确定 Handler 已被调用并且您的代码具有 运行。如果可能,我建议您使用这种方法。最后不需要测试 RabbitMq 是否正常工作。

状态没有发生变化,但你还想测试Handler是否被执行?

在这种情况下你可以做的是使用依赖注入并注册另一个 "IIntegrationEventHandler" 接口的实现。让我们称之为 "TestEventHandler"。在测试设置中,您可以使用 "TestEventHandler" 的实现并检查它是否已被调用。

Question: what should add to the Handle method of the ProcessCompletedIntegrationEventHandler and how check it in the test.

我不建议做这种测试,但如果你真的出于某种原因想要这样做,我从问题中的信息中不清楚,你可以做一些像 TestEventHandler 这样的事情,它有一个 属性 喜欢:

public bool HandlerCalled { get; set; }

您可以调用 TestEventHandler 的 Handle 方法,该方法会将此 属性 设置为 true,并且您可以从该 Handler 调用 ProcessCompletedIntegrationEventHandler Handle 方法。

(For some reasons, I am not interested in assertions based on querying the RabbitMq and checking if the related message is there.)

在我看来,这将是一种更简洁的方法。除非你有充分的理由不这样做,否则我更喜欢这种方法而不是使用 HandlerCalled 属性 的方法,因为无论如何你都不是在嘲笑 RabbitMq。

请记住,有时在微服务架构中您可以进行集成测试,例如:

  • 单独测试特定的微服务 A。意味着您模拟与其他微服务的所有交互。这样做的好处是编写此类测试更容易,而且在您的 CI 中,您不必设置多个微服务及其所有依赖组件(数据库、队列和其他)。

  • 在不模拟其他微服务的情况下测试完全集成。这意味着您将在 CI 中设置特定测试用例和它们之间的交互所需的所有微服务。优点显然是您的测试正在测试完整的集成并且更加现实。另一方面,缺点是执行这些测试所需的所有依赖组件的资源消耗。

编辑:

第一题

回答后续问题 HandlerCalled 是如何实现的?

这里有一些伪代码只是为了举例:

public class ProcessCompletedIntegrationEventHandler: 
    IIntegrationEventHandler<ProcessCompletedIntegrationEvent>
{
    public Task Handle(ProcessCompletedIntegrationEvent @event)
    {
        // Standard handler logic goes here
    }
}

public class TestEventHandler: 
    IIntegrationEventHandler<ProcessCompletedIntegrationEvent>
{
    private readonly IIntegrationEventHandler<ProcessCompletedIntegrationEvent> realHandler;
    public bool HandlerCalled { get; private set; }

    //This will be injected by the DI container
    public TestEventHandler(ProcessCompletedIntegrationEventHandler realHandler)
    {
        this.realHandler = realHandler;
    }

    public Task Handle(ProcessCompletedIntegrationEvent @event)
    {
        HandlerCalled = true;
        realHandler.Handle(@event)
    }
}

//here you would setup the Depdendency Injection and rest of he stuff regarding your services..
// this will depend on what you are using, ASP.NET Core or something else
public class BaseIntegrationTests
{
    protected IServiceCollection container;

    public BaseIntegrationTests()
    {
        //some setup code here.....
        RegisterDependencies();
    }

    //Register dependencies for DI Container
    private void RegisterDependencies()
    {
        container.AddScoped(ProcessCompletedIntegrationEventHandler, TestEventHandler);

        //other registrations here ......
    }

    protected void RegisterTestEventHandlerInstance(TestEventHandler testEventHandler)
    {
        container.AddSingleton(IIntegrationEventHandler<ProcessCompletedIntegrationEvent>, testEventHandler);
    }
}

public class IntegrationTests : BaseIntegrationTests
{
    private readonly TestEventHandler testEventHandler;

    public IntegrationTests()
    {
        //you could move this code here to the test if you want to use a different instance per test
        testEventHandler = new TestEventHandler();
        RegisterTestEventHandlerInstance(testEventHandler);
    }

    [Fact]
    public async Task IfReceivesAndPublishesEvents()
    {
        // Arrange
        _rabbitMQBus.Subscribe<
              ProcessCompletedIntegrationEvent,
              ProcessCompletedIntegrationEventHandler>();

        // You would need to register dependecies if you use DI here
        // or in some central TestFixture class or similar
        // you would do something like

        //Check if the handler is not called yet, since the event has not been published
        Assert.False(testEventHandler.HandlerCalled);

        // Act
        Eventbus.Publish(new ProcessIntegrationEvent(name: "test"));

        // Assert 
        //if handler is called the flag should be true
        Assert.True(testEventHandler.HandlerCalled);
    }
}

第二题

Also, regarding your first suggestion, could you give an example on how to wait for a database record to be updated? (I am using EF).

这是基于事件的系统的一个缺点,您无法准确知道它何时会被处理。但是如果你在你的本地开发机器 and/or CI 服务器上测试 运行 你可以预测并做一些事情来确保你事后检查状态(在事件被处理之后) .

你可以做的是:

  • 实施等待一段时间的机制,直到您第一次检查状态。您可以使用一些硬编码的最小延迟时间来预测这一点。例如,您知道向 RabbitMQ 发送一条消息,即使它在同一台服务器上 运行ning 也会花费您 10 毫秒(这些只是随机数,如果是 10 毫秒或其他一些则必须验证价值)。最重要的是接收另一端的事件例如也需要 10 毫秒。例如,通过这种方式,您可以确定事件不会在 20 毫秒之前得到处理。这样,您始终可以 运行 在最短延迟期后首次检查数据库中的状态。

  • 实施重试逻辑以从某些数据存储(数据库或类似的)获取数据。您可以尝试多次,但会有一些延迟。经过一些合理的次数(您需要自己定义)后,您可以停止重试。

  • 如果您将正在处理的事件保存在本地微服务数据库中,那么您可以检查直到它到达那里应用前面的 2 个建议

根据我的经验,在您的开发机器或 CI 服务器上进行 运行ning 测试的最小延迟在大多数情况下都没有问题,事件将在该延迟期后处理。在某些边缘情况下进行重试最有帮助,例如事件处理需要更长的时间。

现在问题来了,你是如何做到这一点的?有很多方法。一种方法是在 Eventbus 周围创建一个包装器 class 以进行集成测试。每次在事件发布后发布事件时,您都可以在最短延迟时间内停止线程:

[Fact]
public async Task IfReceivesAndPublishesEvents()
{
    // Arrange
    _rabbitMQBus.Subscribe<
          ProcessCompletedIntegrationEvent,
          ProcessCompletedIntegrationEventHandler>();

    // You would need to register dependecies if you use DI here
    // or in some central TestFixture class or similar
    // you would do something like

    // Act
    EventbusIntegrationTest.Publish(new ProcessIntegrationEvent(name: "test"));

    // Assert 
    //Assert state has changed ....
}

public static class EventbusIntegrationTest
{
    private static int minimumLatency = 20;

    public static void Publish(Event @event)
    {
        Eventbus.Publish(@event);
        // after each published event you will wait at least the minimum latency time 
        // before doing any other action
        Task.Delay(minimumLatency).Wait();
    }
}