如何覆盖 MassTransit 默认交换和队列拓扑约定?
How to override MassTransit default exchange and queue topology convention?
正如[在我关于 SO 的一个问题] () 中指出的那样,RabbitMQ 的 MassTransit 会自动创建一定数量的队列并为给定的简单配置交换:
Exchanges, all fanouts:
ConsoleApp1:Program-YourMessage
: Durable
VP0003748_dotnet_bus_6n9oyyfzxhyx9ybobdmpj8qeyt
: Auto-delete and Durable?
test_queue
: Durable
Queues:
VP0003748_dotnet_bus_6n9oyyfzxhyx9ybobdmpj8qeyt
: x-expire 60000
test_queue
: Durable
但是,我发现无法覆盖那些交换器和队列的命名有点令人沮丧。我可以做些什么来改变它吗?
例如,如果您重构某些类型或命名空间,您可能最终会用大量不再使用的交换污染您的 RabbitMQ 实例 =/
我理解 test_queue
因为这是我的决定,非常公平。
类型很容易发生变化/重构。
这是一种简单有效的方法:https://bartwullems.blogspot.com/2018/09/masstransitchange-exchange-naming.html
但最好在此处放置一些 dotnet 核心代码,以帮助刚起步的任何人。
我们基于配置的自定义格式化程序:
public class BusEnvironmentNameFormatter : IEntityNameFormatter
{
private readonly IEntityNameFormatter _original;
private readonly string _prefix;
public BusEnvironmentNameFormatter(IEntityNameFormatter original, SomeAppSettingsSection busSettings)
{
_original = original;
_prefix = string.IsNullOrWhiteSpace(busSettings.Environment)
? string.Empty // no prefix
: $"{busSettings.Environment}:"; // custom prefix
}
// Used to rename the exchanges
public string FormatEntityName<T>()
{
var original = _original.FormatEntityName<T>();
return Format(original);
}
// Use this one to rename the queue
public string Format(string original)
{
return string.IsNullOrWhiteSpace(_prefix)
? original
: $"{_prefix}{original}";
}
}
然后要使用它,我们会做这样的事情:
var busSettings = busConfigSection.Get<SomeAppSettingsSection>();
var rabbitMqSettings = rabbitMqConfigSection.Get<SomeOtherAppSettingsSection>();
services.AddMassTransit(scConfig =>
{
scConfig.AddConsumers(consumerAssemblies);
scConfig.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(rmqConfig =>
{
rmqConfig.UseExtensionsLogging(provider.GetRequiredService<ILoggerFactory>());
// Force serialization of default values: null, false, etc
rmqConfig.ConfigureJsonSerializer(jsonSettings =>
{
jsonSettings.DefaultValueHandling = DefaultValueHandling.Include;
return jsonSettings;
});
var nameFormatter = new BusEnvironmentNameFormatter(rmqConfig.MessageTopology.EntityNameFormatter, busSettings);
var host = rmqConfig.Host(new Uri(rabbitMqSettings.ConnectionString), hostConfig =>
{
hostConfig.Username(rabbitMqSettings.Username);
hostConfig.Password(rabbitMqSettings.Password);
});
// Endpoint with custom naming
rmqConfig.ReceiveEndpoint(host, nameFormatter.Format(busSettings.Endpoint), epConfig =>
{
epConfig.PrefetchCount = busSettings.MessagePrefetchCount;
epConfig.UseMessageRetry(x => x.Interval(busSettings.MessageRetryCount, busSettings.MessageRetryInterval));
epConfig.UseInMemoryOutbox();
//TODO: Bind messages to this queue/endpoint
epConfig.MapMessagesToConsumers(provider, busSettings);
});
// Custom naming for exchanges
rmqConfig.MessageTopology.SetEntityNameFormatter(nameFormatter);
}));
});
可以使用 IRabbitMqBusFactoryConfigurator 的 OverrideDefaultBusEndpointQueueName 方法按以下方式更改队列的名称
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
sbc.Host("rabbitmq://localhost/");
sbc.OverrideDefaultBusEndpointQueueName("endpoint");
});
正如[在我关于 SO 的一个问题] (
Exchanges, all fanouts:
ConsoleApp1:Program-YourMessage
: DurableVP0003748_dotnet_bus_6n9oyyfzxhyx9ybobdmpj8qeyt
: Auto-delete and Durable?test_queue
: DurableQueues:
VP0003748_dotnet_bus_6n9oyyfzxhyx9ybobdmpj8qeyt
: x-expire 60000test_queue
: Durable
但是,我发现无法覆盖那些交换器和队列的命名有点令人沮丧。我可以做些什么来改变它吗?
例如,如果您重构某些类型或命名空间,您可能最终会用大量不再使用的交换污染您的 RabbitMQ 实例 =/
我理解 test_queue
因为这是我的决定,非常公平。
类型很容易发生变化/重构。
这是一种简单有效的方法:https://bartwullems.blogspot.com/2018/09/masstransitchange-exchange-naming.html
但最好在此处放置一些 dotnet 核心代码,以帮助刚起步的任何人。
我们基于配置的自定义格式化程序:
public class BusEnvironmentNameFormatter : IEntityNameFormatter
{
private readonly IEntityNameFormatter _original;
private readonly string _prefix;
public BusEnvironmentNameFormatter(IEntityNameFormatter original, SomeAppSettingsSection busSettings)
{
_original = original;
_prefix = string.IsNullOrWhiteSpace(busSettings.Environment)
? string.Empty // no prefix
: $"{busSettings.Environment}:"; // custom prefix
}
// Used to rename the exchanges
public string FormatEntityName<T>()
{
var original = _original.FormatEntityName<T>();
return Format(original);
}
// Use this one to rename the queue
public string Format(string original)
{
return string.IsNullOrWhiteSpace(_prefix)
? original
: $"{_prefix}{original}";
}
}
然后要使用它,我们会做这样的事情:
var busSettings = busConfigSection.Get<SomeAppSettingsSection>();
var rabbitMqSettings = rabbitMqConfigSection.Get<SomeOtherAppSettingsSection>();
services.AddMassTransit(scConfig =>
{
scConfig.AddConsumers(consumerAssemblies);
scConfig.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(rmqConfig =>
{
rmqConfig.UseExtensionsLogging(provider.GetRequiredService<ILoggerFactory>());
// Force serialization of default values: null, false, etc
rmqConfig.ConfigureJsonSerializer(jsonSettings =>
{
jsonSettings.DefaultValueHandling = DefaultValueHandling.Include;
return jsonSettings;
});
var nameFormatter = new BusEnvironmentNameFormatter(rmqConfig.MessageTopology.EntityNameFormatter, busSettings);
var host = rmqConfig.Host(new Uri(rabbitMqSettings.ConnectionString), hostConfig =>
{
hostConfig.Username(rabbitMqSettings.Username);
hostConfig.Password(rabbitMqSettings.Password);
});
// Endpoint with custom naming
rmqConfig.ReceiveEndpoint(host, nameFormatter.Format(busSettings.Endpoint), epConfig =>
{
epConfig.PrefetchCount = busSettings.MessagePrefetchCount;
epConfig.UseMessageRetry(x => x.Interval(busSettings.MessageRetryCount, busSettings.MessageRetryInterval));
epConfig.UseInMemoryOutbox();
//TODO: Bind messages to this queue/endpoint
epConfig.MapMessagesToConsumers(provider, busSettings);
});
// Custom naming for exchanges
rmqConfig.MessageTopology.SetEntityNameFormatter(nameFormatter);
}));
});
可以使用 IRabbitMqBusFactoryConfigurator 的 OverrideDefaultBusEndpointQueueName 方法按以下方式更改队列的名称
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
sbc.Host("rabbitmq://localhost/");
sbc.OverrideDefaultBusEndpointQueueName("endpoint");
});