添加消费者时引擎盖下的 MassTransit

MassTransit under the hood when adding a consumer

我试图了解以这种方式使用 MassTransit 时会发生什么,而不是完全依赖这个抽象层库,而是真正了解和理解幕后创建的内容及其背后的原因。

在我的应用程序中,我通过以下方式注册消费者:

container.AddMassTransit(x =>
{
    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(configurationProvider.RabbitHostName);

        x.AddConsumer<FactAddedHandler>();
        x.AddConsumer<FactAddedOrderHandler>();
        x.AddConsumer<FactCategoryHandler>();

        cfg.ConfigureEndpoints(container);
    }));

});

请关注我,到目前为止我们有这样的场景:

当我查看正在通过

创建哪些队列时

rabbitmqctl list_queues name messages consumers

我明白了:

FactAddedHandler     0       1
FactAddedOrderHandler        0       1
FactCategoryHandler  0       1

这让我确实相信每个特定的消费者都有一个队列。 所以我们有这个场景:

让我们看看这些Handlers是如何定义的:

internal class FactAddedHandler : IConsumer<FactAddedIntegrationEvent>
{
    //
}


internal class FactAddedOrderHandler : IConsumer<FactAddedIntegrationEvent>
{
    //
}


internal class FactCategoryHandler : IConsumer<FactCategoryIntegrationEvent>
{
    //
}

因此,前 2 个处理程序(FactAddedHandlerFactAddedOrderHandler)订阅相同的事件(FactAddedIntegrationEvent),而另一个 (FactCategoryHandler **) 订阅另一个事件 (**FactCategoryIntegrationEvent)。

因此,我希望在广播 FactAddedIntegrationEvent[=62= 的前 2 个队列之上有 "at least" fanout 交换 ].因此,让我们通过以下方式查看交易所:

rabbitmqctl list_exchanges name type

结果是:

FactCategoryHandler     fanout
FactAddedOrderHandler      fanout
FactAddedHandler   fanout

IntegrationEvents:FactAddedIntegrationEvent  fanout
IntegrationEvents:FactCategoryIntegrationEvent    fanout

所以..我期望的是 IntegrationEvents:FactAddedIntegrationEvent 是广播 FactAddedIntegrationEvent.[=19 的扇出交换=]

我也相信公共交通默认创建另一个交换 IntegrationEvents:FactCategoryIntegrationEvent 以这样一种方式很容易将其他消费者添加到同一事件,即使在我如果只有一个。

因此我们最终会遇到这种情况,这仍然是有道理的:

我不明白并希望得到解释的是创建其他 3 个剩余交易所的原因。他们的作用是什么?他们为什么在那里?提前致谢!

对于接收端点,MassTransit 会创建同名的队列和匹配的交换。队列绑定到匹配的交换器。

消费者在接收端点上使用的消息类型用于声明交换(如上所示),并且匹配的交换绑定到这些消息类型交换。所有这些交换都是 fanout 交换。

生成的拓扑通过匹配的交换路由所有消息类型,然后路由到队列。

这在 documentation.

中用示例进行了解释

为什么匹配交换?两个答案。

首先,简单的回答,它允许使用路由键将消息发送到交换器。使用 RabbitMQ 发送到队列需要以队列名称作为路由键的空交换。

其次,它允许将额外的队列绑定到匹配的交换器以进行故障排除。这包括设置窃听以保留发送到端点的所有消息的副本(直接发送或通过已发布的消息类型交换)。