MassTransit StateMachineSagaTestHarness - 双消息消耗
MassTransit StateMachineSagaTestHarness - double message consumption
我对 InMemoryTestHarness
和状态机 saga 有疑问。看起来每条发布的消息都被 saga 消费了两次。如果发送(未发布)消息,则不会发生该问题。当我将 CorrelationId
更改为自定义字段 - ProcessId
时,问题开始了。下面的简化示例暴露了问题。
传奇定义:
public class MySaga : MassTransitStateMachine<MySagaState>
{
public State InProgress { get; private set; }
public Event<StartProcess> StartProcess { get; private set; }
public Event<ProcessStageFinished> StageFinished { get; private set; }
public Event<ProcessFinished> ProcessFinished { get; private set; }
public MySaga()
{
InstanceState(x => x.CurrentState);
Event(() => StartProcess, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => StageFinished, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => ProcessFinished, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Initially(
When(StartProcess)
.Then(x => x.Instance.ProcessId = x.Data.ProcessId)
.TransitionTo(InProgress)
);
During(InProgress,
When(StageFinished)
.Then(x => x.Instance.Stage++)
);
During(InProgress,
When(ProcessFinished)
.Finalize()
);
}
}
public class MySagaState : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public string ProcessId { get; set; }
public string CurrentState { get; set; }
public int Stage { get; set; }
}
public record StartProcess(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
public record ProcessStageFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
public record ProcessFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
以及 xUnit 测试:
public class MySagaTests
{
InMemoryTestHarness Harness { get; }
IStateMachineSagaTestHarness<MySagaState, MySaga> SagaHarness { get; }
public MySagaTests()
{
var services = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(config =>
{
config.AddSagaStateMachine<MySaga, MySagaState>(sagaConfig =>
{
sagaConfig.UseConcurrentMessageLimit(1);
sagaConfig.UseInMemoryOutbox();
})
.InMemoryRepository();
config.AddSagaStateMachineTestHarness<MySaga, MySagaState>();
});
var serviceProvider = services.BuildServiceProvider(true);
Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();
Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
};
SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState, MySaga>>();
}
[Fact]
public async Task TestMySaga()
{
string processId = "newProcessId";
var correlationId = NewId.NextGuid();
await Harness.Start();
try
{
await Harness.Bus.Publish(new StartProcess(correlationId, processId));
Assert.True(await Harness.Published.Any<StartProcess>());
Assert.True(await Harness.Consumed.Any<StartProcess>());
Assert.True(await SagaHarness.Consumed.Any<StartProcess>());
Assert.Equal(1, SagaHarness.Sagas.Count()); // HERE should be only one saga created
Assert.True(await SagaHarness.Created.Any(s => s.ProcessId == processId && s.CurrentState == "InProgress"));
await Harness.Bus.Publish(new ProcessStageFinished(correlationId, processId));
Assert.True(await Harness.Published.Any<ProcessStageFinished>());
Assert.True(await Harness.Consumed.Any<ProcessStageFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessStageFinished>());
var saga = SagaHarness.Sagas.Select(s => s.ProcessId == processId).FirstOrDefault()?.Saga;
Assert.NotNull(saga);
Assert.Equal(1, saga.Stage); // HERE stage should by 1
await Harness.Bus.Publish(new ProcessFinished(correlationId, processId));
Assert.True(await Harness.Published.Any<ProcessFinished>());
Assert.True(await Harness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Sagas.Any(s => s.ProcessId == processId && s.CurrentState == "Final"));
}
finally
{
await Harness.Stop();
}
}
}
我已经在简单的内存总线配置和 RabbitMQ 上进行了尝试。在这两种配置上它都工作正常。消息仅在 InMemoryTestHarness
.
内被消费两次
您对应该解决的问题有什么建议吗?乍一看,这似乎是一种错误的行为。
是的,删除这一行 – 您在两个不同的端点上配置 saga。
Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
};
在 [@Chris Patterson] 回复后,我重新组织了测试。也许它会对某人有所帮助,重新组织下面的测试:
public class MySagaTests
{
InMemoryTestHarness Harness { get; }
IStateMachineSagaTestHarness<MySagaState, MySaga> SagaHarness { get; }
public MySagaTests()
{
var services = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(config =>
{
config.AddSagaStateMachine<MySaga, MySagaState>(sagaConfig =>
{
sagaConfig.UseConcurrentMessageLimit(1);
sagaConfig.UseInMemoryOutbox();
})
.InMemoryRepository();
config.AddSagaStateMachineTestHarness<MySaga, MySagaState>();
});
var serviceProvider = services.BuildServiceProvider(true);
Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();
SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState, MySaga>>();
}
[Fact]
public async Task TestMySaga()
{
string processId = "newProcessId";
var correlationId = NewId.NextGuid();
await Harness.Start();
try
{
await Harness.Bus.Publish(new StartProcess(correlationId, processId));
var sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId, x => x.InProgress)).Any();
Assert.True(sagaExists, "Saga not exists");
await Harness.Bus.Publish(new ProcessStageFinished(correlationId, processId));
sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId && s.Stage == 1, x => x.InProgress)).Any();
Assert.True(sagaExists, "Saga not exists");
await Harness.Bus.Publish(new ProcessFinished(correlationId, processId));
sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId, x => x.Final)).Any();
Assert.True(sagaExists, "Saga not exists");
}
finally
{
await Harness.Stop();
}
}
}
我对 InMemoryTestHarness
和状态机 saga 有疑问。看起来每条发布的消息都被 saga 消费了两次。如果发送(未发布)消息,则不会发生该问题。当我将 CorrelationId
更改为自定义字段 - ProcessId
时,问题开始了。下面的简化示例暴露了问题。
传奇定义:
public class MySaga : MassTransitStateMachine<MySagaState>
{
public State InProgress { get; private set; }
public Event<StartProcess> StartProcess { get; private set; }
public Event<ProcessStageFinished> StageFinished { get; private set; }
public Event<ProcessFinished> ProcessFinished { get; private set; }
public MySaga()
{
InstanceState(x => x.CurrentState);
Event(() => StartProcess, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => StageFinished, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => ProcessFinished, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Initially(
When(StartProcess)
.Then(x => x.Instance.ProcessId = x.Data.ProcessId)
.TransitionTo(InProgress)
);
During(InProgress,
When(StageFinished)
.Then(x => x.Instance.Stage++)
);
During(InProgress,
When(ProcessFinished)
.Finalize()
);
}
}
public class MySagaState : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public string ProcessId { get; set; }
public string CurrentState { get; set; }
public int Stage { get; set; }
}
public record StartProcess(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
public record ProcessStageFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
public record ProcessFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
以及 xUnit 测试:
public class MySagaTests
{
InMemoryTestHarness Harness { get; }
IStateMachineSagaTestHarness<MySagaState, MySaga> SagaHarness { get; }
public MySagaTests()
{
var services = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(config =>
{
config.AddSagaStateMachine<MySaga, MySagaState>(sagaConfig =>
{
sagaConfig.UseConcurrentMessageLimit(1);
sagaConfig.UseInMemoryOutbox();
})
.InMemoryRepository();
config.AddSagaStateMachineTestHarness<MySaga, MySagaState>();
});
var serviceProvider = services.BuildServiceProvider(true);
Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();
Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
};
SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState, MySaga>>();
}
[Fact]
public async Task TestMySaga()
{
string processId = "newProcessId";
var correlationId = NewId.NextGuid();
await Harness.Start();
try
{
await Harness.Bus.Publish(new StartProcess(correlationId, processId));
Assert.True(await Harness.Published.Any<StartProcess>());
Assert.True(await Harness.Consumed.Any<StartProcess>());
Assert.True(await SagaHarness.Consumed.Any<StartProcess>());
Assert.Equal(1, SagaHarness.Sagas.Count()); // HERE should be only one saga created
Assert.True(await SagaHarness.Created.Any(s => s.ProcessId == processId && s.CurrentState == "InProgress"));
await Harness.Bus.Publish(new ProcessStageFinished(correlationId, processId));
Assert.True(await Harness.Published.Any<ProcessStageFinished>());
Assert.True(await Harness.Consumed.Any<ProcessStageFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessStageFinished>());
var saga = SagaHarness.Sagas.Select(s => s.ProcessId == processId).FirstOrDefault()?.Saga;
Assert.NotNull(saga);
Assert.Equal(1, saga.Stage); // HERE stage should by 1
await Harness.Bus.Publish(new ProcessFinished(correlationId, processId));
Assert.True(await Harness.Published.Any<ProcessFinished>());
Assert.True(await Harness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Sagas.Any(s => s.ProcessId == processId && s.CurrentState == "Final"));
}
finally
{
await Harness.Stop();
}
}
}
我已经在简单的内存总线配置和 RabbitMQ 上进行了尝试。在这两种配置上它都工作正常。消息仅在 InMemoryTestHarness
.
您对应该解决的问题有什么建议吗?乍一看,这似乎是一种错误的行为。
是的,删除这一行 – 您在两个不同的端点上配置 saga。
Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
};
在 [@Chris Patterson] 回复后,我重新组织了测试。也许它会对某人有所帮助,重新组织下面的测试:
public class MySagaTests
{
InMemoryTestHarness Harness { get; }
IStateMachineSagaTestHarness<MySagaState, MySaga> SagaHarness { get; }
public MySagaTests()
{
var services = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(config =>
{
config.AddSagaStateMachine<MySaga, MySagaState>(sagaConfig =>
{
sagaConfig.UseConcurrentMessageLimit(1);
sagaConfig.UseInMemoryOutbox();
})
.InMemoryRepository();
config.AddSagaStateMachineTestHarness<MySaga, MySagaState>();
});
var serviceProvider = services.BuildServiceProvider(true);
Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();
SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState, MySaga>>();
}
[Fact]
public async Task TestMySaga()
{
string processId = "newProcessId";
var correlationId = NewId.NextGuid();
await Harness.Start();
try
{
await Harness.Bus.Publish(new StartProcess(correlationId, processId));
var sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId, x => x.InProgress)).Any();
Assert.True(sagaExists, "Saga not exists");
await Harness.Bus.Publish(new ProcessStageFinished(correlationId, processId));
sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId && s.Stage == 1, x => x.InProgress)).Any();
Assert.True(sagaExists, "Saga not exists");
await Harness.Bus.Publish(new ProcessFinished(correlationId, processId));
sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId, x => x.Final)).Any();
Assert.True(sagaExists, "Saga not exists");
}
finally
{
await Harness.Stop();
}
}
}