添加消费者时引擎盖下的 MassTransit
MassTransit under the hood when adding a consumer
我试图了解以这种方式使用 MassTransit 时会发生什么,而不是完全依赖这个抽象层库,而是真正了解和理解幕后创建的内容及其背后的原因。
在我的应用程序中,我通过以下方式注册消费者:
container.AddMassTransit(x =>
{
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(configurationProvider.RabbitHostName);
x.AddConsumer<FactAddedHandler>();
x.AddConsumer<FactAddedOrderHandler>();
x.AddConsumer<FactCategoryHandler>();
cfg.ConfigureEndpoints(container);
}));
});
请关注我,到目前为止我们有这样的场景:
当我查看正在通过
创建哪些队列时
rabbitmqctl list_queues name messages consumers
我明白了:
FactAddedHandler 0 1
FactAddedOrderHandler 0 1
FactCategoryHandler 0 1
这让我确实相信每个特定的消费者都有一个队列。
所以我们有这个场景:
让我们看看这些Handlers是如何定义的:
internal class FactAddedHandler : IConsumer<FactAddedIntegrationEvent>
{
//
}
internal class FactAddedOrderHandler : IConsumer<FactAddedIntegrationEvent>
{
//
}
internal class FactCategoryHandler : IConsumer<FactCategoryIntegrationEvent>
{
//
}
因此,前 2 个处理程序(FactAddedHandler 和 FactAddedOrderHandler)订阅相同的事件(FactAddedIntegrationEvent),而另一个 (FactCategoryHandler **) 订阅另一个事件 (**FactCategoryIntegrationEvent)。
因此,我希望在广播 FactAddedIntegrationEvent[=62= 的前 2 个队列之上有 "at least" fanout 交换 ].因此,让我们通过以下方式查看交易所:
rabbitmqctl list_exchanges name type
结果是:
FactCategoryHandler fanout
FactAddedOrderHandler fanout
FactAddedHandler fanout
IntegrationEvents:FactAddedIntegrationEvent fanout
IntegrationEvents:FactCategoryIntegrationEvent fanout
所以..我期望的是 IntegrationEvents:FactAddedIntegrationEvent 是广播 FactAddedIntegrationEvent.[=19 的扇出交换=]
我也相信公共交通默认创建另一个交换 IntegrationEvents:FactCategoryIntegrationEvent 以这样一种方式很容易将其他消费者添加到同一事件,即使在我如果只有一个。
因此我们最终会遇到这种情况,这仍然是有道理的:
我不明白并希望得到解释的是创建其他 3 个剩余交易所的原因。他们的作用是什么?他们为什么在那里?提前致谢!
对于接收端点,MassTransit 会创建同名的队列和匹配的交换。队列绑定到匹配的交换器。
消费者在接收端点上使用的消息类型用于声明交换(如上所示),并且匹配的交换绑定到这些消息类型交换。所有这些交换都是 fanout 交换。
生成的拓扑通过匹配的交换路由所有消息类型,然后路由到队列。
这在 documentation.
中用示例进行了解释
为什么匹配交换?两个答案。
首先,简单的回答,它允许使用路由键将消息发送到交换器。使用 RabbitMQ 发送到队列需要以队列名称作为路由键的空交换。
其次,它允许将额外的队列绑定到匹配的交换器以进行故障排除。这包括设置窃听以保留发送到端点的所有消息的副本(直接发送或通过已发布的消息类型交换)。
我试图了解以这种方式使用 MassTransit 时会发生什么,而不是完全依赖这个抽象层库,而是真正了解和理解幕后创建的内容及其背后的原因。
在我的应用程序中,我通过以下方式注册消费者:
container.AddMassTransit(x =>
{
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(configurationProvider.RabbitHostName);
x.AddConsumer<FactAddedHandler>();
x.AddConsumer<FactAddedOrderHandler>();
x.AddConsumer<FactCategoryHandler>();
cfg.ConfigureEndpoints(container);
}));
});
请关注我,到目前为止我们有这样的场景:
当我查看正在通过
创建哪些队列时rabbitmqctl list_queues name messages consumers
我明白了:
FactAddedHandler 0 1
FactAddedOrderHandler 0 1
FactCategoryHandler 0 1
这让我确实相信每个特定的消费者都有一个队列。 所以我们有这个场景:
让我们看看这些Handlers是如何定义的:
internal class FactAddedHandler : IConsumer<FactAddedIntegrationEvent>
{
//
}
internal class FactAddedOrderHandler : IConsumer<FactAddedIntegrationEvent>
{
//
}
internal class FactCategoryHandler : IConsumer<FactCategoryIntegrationEvent>
{
//
}
因此,前 2 个处理程序(FactAddedHandler 和 FactAddedOrderHandler)订阅相同的事件(FactAddedIntegrationEvent),而另一个 (FactCategoryHandler **) 订阅另一个事件 (**FactCategoryIntegrationEvent)。
因此,我希望在广播 FactAddedIntegrationEvent[=62= 的前 2 个队列之上有 "at least" fanout 交换 ].因此,让我们通过以下方式查看交易所:
rabbitmqctl list_exchanges name type
结果是:
FactCategoryHandler fanout
FactAddedOrderHandler fanout
FactAddedHandler fanout
IntegrationEvents:FactAddedIntegrationEvent fanout
IntegrationEvents:FactCategoryIntegrationEvent fanout
所以..我期望的是 IntegrationEvents:FactAddedIntegrationEvent 是广播 FactAddedIntegrationEvent.[=19 的扇出交换=]
我也相信公共交通默认创建另一个交换 IntegrationEvents:FactCategoryIntegrationEvent 以这样一种方式很容易将其他消费者添加到同一事件,即使在我如果只有一个。
因此我们最终会遇到这种情况,这仍然是有道理的:
我不明白并希望得到解释的是创建其他 3 个剩余交易所的原因。他们的作用是什么?他们为什么在那里?提前致谢!
对于接收端点,MassTransit 会创建同名的队列和匹配的交换。队列绑定到匹配的交换器。
消费者在接收端点上使用的消息类型用于声明交换(如上所示),并且匹配的交换绑定到这些消息类型交换。所有这些交换都是 fanout 交换。
生成的拓扑通过匹配的交换路由所有消息类型,然后路由到队列。
这在 documentation.
中用示例进行了解释为什么匹配交换?两个答案。
首先,简单的回答,它允许使用路由键将消息发送到交换器。使用 RabbitMQ 发送到队列需要以队列名称作为路由键的空交换。
其次,它允许将额外的队列绑定到匹配的交换器以进行故障排除。这包括设置窃听以保留发送到端点的所有消息的副本(直接发送或通过已发布的消息类型交换)。