MassTransit 帮助注册多个 sagas

MassTransit help registering multiple sagas

我正在努力将一个旧应用程序移植到 .net6,运行 遇到了使用 masst运行sit 注册多个 sagas 的问题。

services.AddMassTransit<IProcessManagerBus>(busCfg =>
{
    busCfg.AddSagaStateMachine<OrderPM, OrderPMState>()
        .EntityFrameworkRepository<OrderPMState>(efConfig =>
        {
            efConfig.ConcurrencyMode = ConcurrencyMode.Optimistic;
            efConfig.DatabaseFactory(() => new OrderStateDbContext(configuration.GetConnectionString("DB")));
        });
    
    busCfg.AddSagaStateMachine<CsaLoginPM, CsaLoginPMState>()
        .EntityFrameworkRepository<CsaLoginPMState>(efConfig =>
        {
            efConfig.ConcurrencyMode = ConcurrencyMode.Optimistic;
            efConfig.DatabaseFactory(() => new CsaLoginStateDbContext(configuration.GetConnectionString("DB")));
        });
    
    busCfg.UsingRabbitMq((context, rabbitCfg) =>
    {
        rabbitCfg.UseJsonSerializer();
        rabbitCfg.Host(new Uri(configuration.GetValue<string>("messaging:pm-bus:host-address")), hostCfg =>
        {
            hostCfg.Username(configuration.GetValue<string>("messaging:pm-bus:username"));
            hostCfg.Password(configuration.GetValue<string>("messaging:pm-bus:password"));        
            
            rabbitCfg.ReceiveEndpoint(configuration.GetValue<string>("messaging:pm-bus:receive-queue"), epCfg =>
            {
                epCfg.PrefetchCount = 10;
                epCfg.UseRetry(retryConfig => retryConfig.Exponential(5, TimeSpan.FromMilliseconds(500), TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(1)));
                
                epCfg.ConfigureSagas(context);
            });
        });
    });
});

OrderPMState 工作正常,但 CsaLoginPMState 在触发时给出以下错误:System.InvalidOperationException:实体类型 CsaLoginPMState 不是当前上下文模型的一部分。

如果我注释掉 OrderPMState 的注册,CsaLoginPMState 工作正常。我怀疑这 2 个 sagas 使用的是相同的 DbContext,尽管它们已在各自的 DbContext 中注册。

OrderStateDbContext

public class OrderStateDbContext : SagaDbContext
{
    public OrderStateDbContext(string nameOrConnectionString)
        : base(nameOrConnectionString)
    {
    }

    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get { yield return new OrderPMStateMapping(); }
    }
}

CsaLoginStateDbContext

public class CsaLoginStateDbContext : SagaDbContext
{
    public CsaLoginStateDbContext(string nameOrConnectionString)
        : base(nameOrConnectionString)
    {
    }

    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get { yield return new CsaLoginPMStateMapping(); }
    }
}

旧版app使用AutoFac,注册是这样的:

builder.Register(x =>
    EntityFrameworkSagaRepository<OrderPMState>.CreateOptimistic(() => new OrderStateDbContext(_configuration.GetConnectionString("DB"))))
    .As<ISagaRepository<OrderPMState>>().SingleInstance();

builder.Register(x =>
    EntityFrameworkSagaRepository<CsaLoginPMState>.CreateOptimistic(() => new CsaLoginStateDbContext(_configuration.GetConnectionString("DB"))))
    .As<ISagaRepository<CsaLoginPMState>>().SingleInstance();

我是不是漏掉了什么?

使用 DbContext 时,您应该使用专为 DbContext 使用而设计的两种配置方法之一。

 busCfg.AddSagaStateMachine<CsaLoginPM, CsaLoginPMState>()
    .EntityFrameworkRepository<CsaLoginPMState>(efConfig =>
    {
        efConfig.ConcurrencyMode = ConcurrencyMode.Optimistic;

        efConfig.AddDbContext<DbContext, CsaLoginStateDbContext>((provider,builder) =>
        {
            builder.UseSqlServer(configuration.GetConnectionString("DB"), m =>
            {
                m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                m.MigrationsHistoryTable($"__{nameof(CsaLoginStateDbContext)}");
            });
        });
    });

或者您可以单独添加 DbContext 并在 saga 中使用现有的 DbContext:


services.AddDbContext<CsaLoginStateDbContext>(builder =>
    builder.UseSqlServer(configuration.GetConnectionString("DB"), m =>
    {
        m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
        m.MigrationsHistoryTable($"__{nameof(CsaLoginStateDbContext)}");
    }));

services.AddMassTransit(x =>
{
    x.AddSagaRepository<JobSaga>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Optimistic;
            r.ExistingDbContext<CsaLoginStateDbContext>();
        });
});

这都涵盖了in the documentation