使用 MassTransit 7 将两个服务连接到同一个 VHost 以处理作业(JobSlotUnavailable)

Connecting two services to same VHost using MassTransit 7 to process jobs (JobSlotUnavailable)

我正在从 Hangfire 迁移到 MassTransit 7/RabbitMQ 以执行后端作业。我们有许多 windows 服务,每个服务处理一组作业。

当我单独启动服务 A 时,作业消费者正常执行,我可以看到每个作业都在审计存储接收 JobSlotAllocated 事件的地方执行。

如果我单独启动服务B,Jobs也能正常执行。

现在,如果两个服务都已启动,MassTransit 将停止释放插槽,并且只会执行前 20 个作业(并发限制设置为 20),并且审计存储会开始接收 JobSlotUnavailable 事件。

是否需要进行任何配置才能让两个服务都使用作业?

总线配置如下:

        services.AddMassTransit(busConfigurator =>
        {
            busConfigurator.SetKebabCaseEndpointNameFormatter();

    //Register commands, jobs, ...

            busConfigurator.AddRabbitMqMessageScheduler();

            busConfigurator.UsingRabbitMq((context, cfg) =>
            {



                cfg.Host(busOptions.Host, "/", h =>
                {
                    h.Username(busOptions.Username);
                    h.Password(busOptions?.Password);
                });


                cfg.UseDelayedExchangeMessageScheduler();

                

                if (serviceInstance)
                {
                    var options = new ServiceInstanceOptions();
                    options.EnableInstanceEndpoint();
                    options.EnableJobServiceEndpoints();

                    cfg.ServiceInstance(options, instance =>
                    {
                        instance.ConfigureJobService();

                        instance.ConfigureJobServiceEndpoints(a =>
                        {
                            a.StartJobTimeout = TimeSpan.FromSeconds(30);
                        });

                        instance.ConfigureEndpoints(context);
                    });
                }
                else
                {
                    cfg.ConfigureEndpoints(context);
                }
            });
        });

        services.AddMassTransitHostedService();

谢谢,

您需要为作业服务配置一个共享的 saga 存储库。您当前使用的是 in-memory,这是针对每个实例的,将无法正常工作。

您可以在此 sample 中了解如何将 EF Core 配置为作业服务的 saga 存储库。