我可以在公共交通的端点配置中使用 UsePartitioner 来对传送到同一队列的多种消息类型进行分区吗
Can I use UsePartitioner in the endpoint configuration in mass transit to partition multiple message types that are delivered to the same queue
我正在使用 masstransit 和 rabbitmq。我的队列有各种类型的消息传递给它。这些类型中的大多数都实现了 IHaveOrganisationKey。我想在管道中添加一个分区程序,以确保同时只处理一条具有给定 organizationkey 的消息(任何类型)。目标是限制并行处理同一组织的多条消息时出现的并发问题,同时允许并行处理来自不同组织的消息。
配置代码
sbc.ReceiveEndpoint(host, this.queueConfiguration.QueueName, ep =>
{
ep.PrefetchCount = this.busConfiguration.PrefetchCount;
this.queueConfiguration.ConfigureEndpoint(ep);
this.queueConfiguration.SubscribeMessages(worker, ep);
});
在队列配置中:
public override void ConfigureEndpoint(IRabbitMqReceiveEndpointConfigurator ep)
{
// This is incomplete. Am I on the right track here?
ep.UsePartitioner(1, x => x.TryGetMessage<IHaveOrganisationKey>());
}
在这种情况下,您需要单独创建分区程序,然后将其传递给每个消息类型配置。
var p = ep.CreatePartitioner(8);
ep.Consumer<ConsumerA>(x => x.Message<A>(m => m.UsePartitioner(p, c => c.Message.OrgKey)));
ep.Consumer<ConsumerB>(x => x.Message<B>(m => m.UsePartitioner(p, c => c.Message.OrgKey)));
如果您的消费者有多种消息类型,您可以在配置中指定多个 Message
语句。
我正在使用 masstransit 和 rabbitmq。我的队列有各种类型的消息传递给它。这些类型中的大多数都实现了 IHaveOrganisationKey。我想在管道中添加一个分区程序,以确保同时只处理一条具有给定 organizationkey 的消息(任何类型)。目标是限制并行处理同一组织的多条消息时出现的并发问题,同时允许并行处理来自不同组织的消息。
配置代码
sbc.ReceiveEndpoint(host, this.queueConfiguration.QueueName, ep =>
{
ep.PrefetchCount = this.busConfiguration.PrefetchCount;
this.queueConfiguration.ConfigureEndpoint(ep);
this.queueConfiguration.SubscribeMessages(worker, ep);
});
在队列配置中:
public override void ConfigureEndpoint(IRabbitMqReceiveEndpointConfigurator ep)
{
// This is incomplete. Am I on the right track here?
ep.UsePartitioner(1, x => x.TryGetMessage<IHaveOrganisationKey>());
}
在这种情况下,您需要单独创建分区程序,然后将其传递给每个消息类型配置。
var p = ep.CreatePartitioner(8);
ep.Consumer<ConsumerA>(x => x.Message<A>(m => m.UsePartitioner(p, c => c.Message.OrgKey)));
ep.Consumer<ConsumerB>(x => x.Message<B>(m => m.UsePartitioner(p, c => c.Message.OrgKey)));
如果您的消费者有多种消息类型,您可以在配置中指定多个 Message
语句。