如何使用 Mediator 配置 MassTransit 以发布消息?

How to configure MassTransit with Mediator to publish messages?

我是 MassTransitMediator 的新手,我有一系列事件要按连续顺序执行,我正在使用MassTransit 进程内和内存中,对于我的用例,不需要传输。

我想通过 Mediator 向消费者、sagas、活动发送和发布消息,我有下面的代码,但我想通过在 startup.cs 中注册 MassTransit 来改进它:

//asp net core 3.1 Controller

 [ApiController]
 public class MyController : ControllerBase
 {    
    private readonly IProductService _productService ;
    private readonly IMediator mediator;
    public MyController(IProductService productService)
    {
       _productService = productService;
      var repository = new InMemorySagaRepository<ApiSaga>();
      mediator = Bus.Factory.CreateMediator(cfg =>
      {
          cfg.Saga<ProductSaga>(repository);
      });
    }
   
     [HttpPost]
     public async Task<IActionResult> Post([FromBody] ProductContract productContract)
     {            
         try
         {
             var result = await _productService.DoSomeThingAsync(productContract);
             await mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = Guid.NewGuid(), result.Label });
         return Ok();
         }
         catch (Exception ex)
         {
             return BadRequest(ex.Message);
         }
      }
 }

//My saga
public class ProductSaga :
        ISaga,
        InitiatedBy<ProductSubmittedEvent>
    {
        public Guid CorrelationId { get; set; }
        public string State { get; private set; } = "Not Started";

        public Task Consume(ConsumeContext<ProductSubmittedEvent> context)
        {
            var label= context.Message.Label;
            State = "AwaitingForNextStep";
            //...
           //send next command
        }
    }

像这样它可以工作,但不合适,我想在我的 startup.cs 中使用 Mediator 配置公共交通以获得一个合适的实例,为此我首先删除 IMediator,使用IPublishEndpoint 将消息发布到 Saga 并配置我的 startup.cs,但它没有按预期工作:

//startup.cs

 public void ConfigureServices(IServiceCollection services)
 {
      services.AddMediator(cfg =>
            {
                cfg.AddSaga<ProductSaga>().InMemoryRepository();
            });
 }

//in controller using:
private readonly IPublishEndpoint _publishEndpoint;

//then
await _publishEndpoint.Publish<ProductSubmittedEvent>(
    new { CorrelationId = Guid.NewGuid(), result.Label });

我得到了 System.InvalidOperationException:

Unable to resolve service for type 'MassTransit.IPublishEndpoint' while attempting to activate 'GaaS.API.Controllers.ManageApiController'.

我尝试更新我的 startup.cs:

var repository = new InMemorySagaRepository<ApiSaga>();
            services.AddMassTransit(cfg =>
            {
                cfg.AddBus(provider =>
                {
                    return Bus.Factory.CreateMediator(x =>
                    {
                        x.Saga<ProductSaga>(repository);
                    });
                });
            });

我得到了:

Cannot implicitly convert type 'MassTransit.Mediator.IMediator' to 'MassTransit.IBusControl'.

如果您有任何推荐想法,感谢分享和挑战我

在项目中配置 MassTransit Mediator 的正确方法是通过 Startup.cs 文件,您似乎已经尝试过。

public void ConfigureServices(IServiceCollection services)
{
    services.AddMediator(cfg =>
    {
        cfg.AddSaga<ProductSaga>().InMemoryRepository();
    });
}

使用mediator,需要依赖IMediator接口。您不能使用 IPublishEndpointISendEndpointProvider,因为它们是总线接口。由于您可以在容器中同时拥有调解器和总线实例,因此在从容器解析服务时会导致混淆。

[ApiController]
public class MyController : ControllerBase
{    
    private readonly IProductService _productService ;
    private readonly IMediator _mediator;

    public MyController(IProductService productService, IMediator mediator)
    {
        _productService = productService;
        _mediator = mediator;
    }

    [HttpPost]
    public async Task<IActionResult> Post([FromBody] ProductContract productContract)
    {            
        try
        {
            var result = await _productService.DoSomeThingAsync(productContract);
            
            await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });

            return Ok();
        }
        catch (Exception ex)
        {
            return BadRequest(ex.Message);
        }
    }
}

如果您只使用调解器,并且想使用 IPublishEndpoint,您可以自己将其添加到容器中并委托它。

services.AddSingleton<IPublishEndpoint>(provider => provider.GetService<IMediator>());

我从(优秀的)youtube video - MassTransit starting with Mediator得到这个,在那个示例中有一行代码

AddMediator()

我找不到。我相信以下设置提供了让代码基于该视频工作所需的一切...

            services.AddMassTransit(config =>
            {
                config.AddRequestClient<ISubmitOrder>();
                config.AddConsumersFromNamespaceContaining<SubmitOrderConsumer>();

                config.UsingInMemory(ConfigureBus);
            });

然后ConfigureBus是:

        private void ConfigureBus(IBusRegistrationContext context, IInMemoryBusFactoryConfigurator configurator)
        {
            configurator.ConfigureEndpoints(context);
        }

我在其他地方找不到这个,因此张贴在这里。