使用 MassTransit 7 将两个服务连接到同一个 VHost 以处理作业(JobSlotUnavailable)
Connecting two services to same VHost using MassTransit 7 to process jobs (JobSlotUnavailable)
我正在从 Hangfire 迁移到 MassTransit 7/RabbitMQ 以执行后端作业。我们有许多 windows 服务,每个服务处理一组作业。
当我单独启动服务 A 时,作业消费者正常执行,我可以看到每个作业都在审计存储接收 JobSlotAllocated 事件的地方执行。
如果我单独启动服务B,Jobs也能正常执行。
现在,如果两个服务都已启动,MassTransit 将停止释放插槽,并且只会执行前 20 个作业(并发限制设置为 20),并且审计存储会开始接收 JobSlotUnavailable 事件。
是否需要进行任何配置才能让两个服务都使用作业?
总线配置如下:
services.AddMassTransit(busConfigurator =>
{
busConfigurator.SetKebabCaseEndpointNameFormatter();
//Register commands, jobs, ...
busConfigurator.AddRabbitMqMessageScheduler();
busConfigurator.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busOptions.Host, "/", h =>
{
h.Username(busOptions.Username);
h.Password(busOptions?.Password);
});
cfg.UseDelayedExchangeMessageScheduler();
if (serviceInstance)
{
var options = new ServiceInstanceOptions();
options.EnableInstanceEndpoint();
options.EnableJobServiceEndpoints();
cfg.ServiceInstance(options, instance =>
{
instance.ConfigureJobService();
instance.ConfigureJobServiceEndpoints(a =>
{
a.StartJobTimeout = TimeSpan.FromSeconds(30);
});
instance.ConfigureEndpoints(context);
});
}
else
{
cfg.ConfigureEndpoints(context);
}
});
});
services.AddMassTransitHostedService();
谢谢,
您需要为作业服务配置一个共享的 saga 存储库。您当前使用的是 in-memory,这是针对每个实例的,将无法正常工作。
您可以在此 sample 中了解如何将 EF Core 配置为作业服务的 saga 存储库。
我正在从 Hangfire 迁移到 MassTransit 7/RabbitMQ 以执行后端作业。我们有许多 windows 服务,每个服务处理一组作业。
当我单独启动服务 A 时,作业消费者正常执行,我可以看到每个作业都在审计存储接收 JobSlotAllocated 事件的地方执行。
如果我单独启动服务B,Jobs也能正常执行。
现在,如果两个服务都已启动,MassTransit 将停止释放插槽,并且只会执行前 20 个作业(并发限制设置为 20),并且审计存储会开始接收 JobSlotUnavailable 事件。
是否需要进行任何配置才能让两个服务都使用作业?
总线配置如下:
services.AddMassTransit(busConfigurator =>
{
busConfigurator.SetKebabCaseEndpointNameFormatter();
//Register commands, jobs, ...
busConfigurator.AddRabbitMqMessageScheduler();
busConfigurator.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busOptions.Host, "/", h =>
{
h.Username(busOptions.Username);
h.Password(busOptions?.Password);
});
cfg.UseDelayedExchangeMessageScheduler();
if (serviceInstance)
{
var options = new ServiceInstanceOptions();
options.EnableInstanceEndpoint();
options.EnableJobServiceEndpoints();
cfg.ServiceInstance(options, instance =>
{
instance.ConfigureJobService();
instance.ConfigureJobServiceEndpoints(a =>
{
a.StartJobTimeout = TimeSpan.FromSeconds(30);
});
instance.ConfigureEndpoints(context);
});
}
else
{
cfg.ConfigureEndpoints(context);
}
});
});
services.AddMassTransitHostedService();
谢谢,
您需要为作业服务配置一个共享的 saga 存储库。您当前使用的是 in-memory,这是针对每个实例的,将无法正常工作。
您可以在此 sample 中了解如何将 EF Core 配置为作业服务的 saga 存储库。