MassTransit:无法接收从 Saga 到 Consumer 的命令 [已修复]

MassTransit: Can not receive command from Saga to Consumer [Fixed]

我有一个工作流从控制器(属于 csproj1)发布一个事件,它将被 MySaga(属于 .csproj2)使用,然后是 Saga已发布的消息,它将发送一个新命令,该命令将从 CommandConsumer(属于 csproj2)中使用。

问题:

MySaga.

发送时无法在 CommandConsumer 中接收 命令

我正在使用 Azure Service BusConsumer Saga

这是我的 startup.cs,我认为配置不当:


        services.AddMassTransit(x =>
        {                
            x.AddConsumersFromNamespaceContaining<CommandConsumer>();            
            x.AddSaga<MySaga>().InMemoryRepository();

            x.UsingAzureServiceBus((context, cfg) =>
            {
                cfg.Host(Configuration["ServiceBus:ConnectionString"]);
                cfg.ConfigureEndpoints(context);
            });
        });

这里是MySaga.cs

//...

    public readonly Uri targetEndpoint = new Uri($"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand))}");

    public async Task Consume(ConsumeContext<AceptedEvent> context)
    {
        var sendEndpoint = await context.GetSendEndpoint(targetEndpoint);
        await sendEndpoint.Send<MyCommand>(new
        {
            CorrelationId,
            context.Message.Item1,
            context.Message.Item2
        });
    }
//...

这里是CommandConsumer.cs


public class CommandConsumer: IConsumer<MyCommand>
{
    public ILogger<MyCommand> _logger { get; }
    public IService _myService { get; }
    public MyCommandConsumer(ILogger<MyCommand> logger, Iservice myService)
    {
        _logger = logger;
        _myService = myService;
    }
    

    public async Task Consume(ConsumeContext<MyCommand> context)
    {
        _logger?.LogInformation($"{context.Message.CorrelationId}");            
        try
        {
            await _myService.DoSomething();
            await context.Publish<AcceptedEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "NewState"
            });
        }
        catch (Exception ex)
        {
            await context.Publish<RefusedEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "Failed",
                Error = ex.Message
            });
            throw;
        }
     }
}

如果您有任何推荐想法,感谢您的分享和挑战!

解决方案:

正如@ChrisPatterson 所提到的,我使用了 Publish 方法,因为我的命令只有一个消费者并且它按预期工作。

不仅如此,我还修复了 ReceiveEndpoint 的配置:


var repository = new InMemorySagaRepository<MySaga>();
        x.UsingAzureServiceBus((context, cfg) =>
        {
            cfg.Host(Configuration["ServiceBus:ConnectionString"]);
            cfg.ReceiveEndpoint("queue", cfgEndpoint =>
            {
                cfgEndpoint.Saga<MySaga>(repository);
            });
            cfg.ReceiveEndpoint(KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand)),
                cfgEndpoint =>
                {
                    cfgEndpoint.ConfigureConsumer<CommandConsumer>(context);
                });

        });

生产者正在发送到这个接收端点:

public readonly Uri targetEndpoint = 
new Uri($"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand))}");

根据您的使用方式,最终将成为“我的命令”。

ConfigureEndpoints 将使用消费者 class 端点名称,这相当于:

KebabCaseEndpointNameFormatter.Instance.Consumer<CommandConsumer>()

最终成为“命令”——所以发送端点没有发送到消费者的正确地址。

如果那是该命令的唯一使用者,您可以改用 Publish,这样您就不需要知道使用者的地址。否则,您需要知道正确的目标地址。