如何使用 Mediator 配置 MassTransit 以发布消息?
How to configure MassTransit with Mediator to publish messages?
我是 MassTransit 和 Mediator 的新手,我有一系列事件要按连续顺序执行,我正在使用MassTransit 进程内和内存中,对于我的用例,不需要传输。
我想通过 Mediator 向消费者、sagas、活动发送和发布消息,我有下面的代码,但我想通过在 startup.cs
中注册 MassTransit 来改进它:
//asp net core 3.1 Controller
[ApiController]
public class MyController : ControllerBase
{
private readonly IProductService _productService ;
private readonly IMediator mediator;
public MyController(IProductService productService)
{
_productService = productService;
var repository = new InMemorySagaRepository<ApiSaga>();
mediator = Bus.Factory.CreateMediator(cfg =>
{
cfg.Saga<ProductSaga>(repository);
});
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] ProductContract productContract)
{
try
{
var result = await _productService.DoSomeThingAsync(productContract);
await mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = Guid.NewGuid(), result.Label });
return Ok();
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}
}
//My saga
public class ProductSaga :
ISaga,
InitiatedBy<ProductSubmittedEvent>
{
public Guid CorrelationId { get; set; }
public string State { get; private set; } = "Not Started";
public Task Consume(ConsumeContext<ProductSubmittedEvent> context)
{
var label= context.Message.Label;
State = "AwaitingForNextStep";
//...
//send next command
}
}
像这样它可以工作,但不合适,我想在我的 startup.cs
中使用 Mediator 配置公共交通以获得一个合适的实例,为此我首先删除 IMediator
,使用IPublishEndpoint
将消息发布到 Saga
并配置我的 startup.cs
,但它没有按预期工作:
//startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddMediator(cfg =>
{
cfg.AddSaga<ProductSaga>().InMemoryRepository();
});
}
//in controller using:
private readonly IPublishEndpoint _publishEndpoint;
//then
await _publishEndpoint.Publish<ProductSubmittedEvent>(
new { CorrelationId = Guid.NewGuid(), result.Label });
我得到了 System.InvalidOperationException
:
Unable to resolve service for type 'MassTransit.IPublishEndpoint' while attempting to activate 'GaaS.API.Controllers.ManageApiController'.
我尝试更新我的 startup.cs
:
var repository = new InMemorySagaRepository<ApiSaga>();
services.AddMassTransit(cfg =>
{
cfg.AddBus(provider =>
{
return Bus.Factory.CreateMediator(x =>
{
x.Saga<ProductSaga>(repository);
});
});
});
我得到了:
Cannot implicitly convert type 'MassTransit.Mediator.IMediator' to 'MassTransit.IBusControl'.
如果您有任何推荐想法,感谢分享和挑战我
在项目中配置 MassTransit Mediator 的正确方法是通过 Startup.cs
文件,您似乎已经尝试过。
public void ConfigureServices(IServiceCollection services)
{
services.AddMediator(cfg =>
{
cfg.AddSaga<ProductSaga>().InMemoryRepository();
});
}
使用mediator,需要依赖IMediator
接口。您不能使用 IPublishEndpoint
或 ISendEndpointProvider
,因为它们是总线接口。由于您可以在容器中同时拥有调解器和总线实例,因此在从容器解析服务时会导致混淆。
[ApiController]
public class MyController : ControllerBase
{
private readonly IProductService _productService ;
private readonly IMediator _mediator;
public MyController(IProductService productService, IMediator mediator)
{
_productService = productService;
_mediator = mediator;
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] ProductContract productContract)
{
try
{
var result = await _productService.DoSomeThingAsync(productContract);
await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });
return Ok();
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}
}
如果您只使用调解器,并且想使用 IPublishEndpoint
,您可以自己将其添加到容器中并委托它。
services.AddSingleton<IPublishEndpoint>(provider => provider.GetService<IMediator>());
我从(优秀的)youtube video - MassTransit starting with Mediator得到这个,在那个示例中有一行代码
AddMediator()
我找不到。我相信以下设置提供了让代码基于该视频工作所需的一切...
services.AddMassTransit(config =>
{
config.AddRequestClient<ISubmitOrder>();
config.AddConsumersFromNamespaceContaining<SubmitOrderConsumer>();
config.UsingInMemory(ConfigureBus);
});
然后ConfigureBus
是:
private void ConfigureBus(IBusRegistrationContext context, IInMemoryBusFactoryConfigurator configurator)
{
configurator.ConfigureEndpoints(context);
}
我在其他地方找不到这个,因此张贴在这里。
我是 MassTransit 和 Mediator 的新手,我有一系列事件要按连续顺序执行,我正在使用MassTransit 进程内和内存中,对于我的用例,不需要传输。
我想通过 Mediator 向消费者、sagas、活动发送和发布消息,我有下面的代码,但我想通过在 startup.cs
中注册 MassTransit 来改进它:
//asp net core 3.1 Controller
[ApiController]
public class MyController : ControllerBase
{
private readonly IProductService _productService ;
private readonly IMediator mediator;
public MyController(IProductService productService)
{
_productService = productService;
var repository = new InMemorySagaRepository<ApiSaga>();
mediator = Bus.Factory.CreateMediator(cfg =>
{
cfg.Saga<ProductSaga>(repository);
});
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] ProductContract productContract)
{
try
{
var result = await _productService.DoSomeThingAsync(productContract);
await mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = Guid.NewGuid(), result.Label });
return Ok();
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}
}
//My saga
public class ProductSaga :
ISaga,
InitiatedBy<ProductSubmittedEvent>
{
public Guid CorrelationId { get; set; }
public string State { get; private set; } = "Not Started";
public Task Consume(ConsumeContext<ProductSubmittedEvent> context)
{
var label= context.Message.Label;
State = "AwaitingForNextStep";
//...
//send next command
}
}
像这样它可以工作,但不合适,我想在我的 startup.cs
中使用 Mediator 配置公共交通以获得一个合适的实例,为此我首先删除 IMediator
,使用IPublishEndpoint
将消息发布到 Saga
并配置我的 startup.cs
,但它没有按预期工作:
//startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddMediator(cfg =>
{
cfg.AddSaga<ProductSaga>().InMemoryRepository();
});
}
//in controller using:
private readonly IPublishEndpoint _publishEndpoint;
//then
await _publishEndpoint.Publish<ProductSubmittedEvent>(
new { CorrelationId = Guid.NewGuid(), result.Label });
我得到了 System.InvalidOperationException
:
Unable to resolve service for type 'MassTransit.IPublishEndpoint' while attempting to activate 'GaaS.API.Controllers.ManageApiController'.
我尝试更新我的 startup.cs
:
var repository = new InMemorySagaRepository<ApiSaga>();
services.AddMassTransit(cfg =>
{
cfg.AddBus(provider =>
{
return Bus.Factory.CreateMediator(x =>
{
x.Saga<ProductSaga>(repository);
});
});
});
我得到了:
Cannot implicitly convert type 'MassTransit.Mediator.IMediator' to 'MassTransit.IBusControl'.
如果您有任何推荐想法,感谢分享和挑战我
在项目中配置 MassTransit Mediator 的正确方法是通过 Startup.cs
文件,您似乎已经尝试过。
public void ConfigureServices(IServiceCollection services)
{
services.AddMediator(cfg =>
{
cfg.AddSaga<ProductSaga>().InMemoryRepository();
});
}
使用mediator,需要依赖IMediator
接口。您不能使用 IPublishEndpoint
或 ISendEndpointProvider
,因为它们是总线接口。由于您可以在容器中同时拥有调解器和总线实例,因此在从容器解析服务时会导致混淆。
[ApiController]
public class MyController : ControllerBase
{
private readonly IProductService _productService ;
private readonly IMediator _mediator;
public MyController(IProductService productService, IMediator mediator)
{
_productService = productService;
_mediator = mediator;
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] ProductContract productContract)
{
try
{
var result = await _productService.DoSomeThingAsync(productContract);
await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });
return Ok();
}
catch (Exception ex)
{
return BadRequest(ex.Message);
}
}
}
如果您只使用调解器,并且想使用 IPublishEndpoint
,您可以自己将其添加到容器中并委托它。
services.AddSingleton<IPublishEndpoint>(provider => provider.GetService<IMediator>());
我从(优秀的)youtube video - MassTransit starting with Mediator得到这个,在那个示例中有一行代码
AddMediator()
我找不到。我相信以下设置提供了让代码基于该视频工作所需的一切...
services.AddMassTransit(config =>
{
config.AddRequestClient<ISubmitOrder>();
config.AddConsumersFromNamespaceContaining<SubmitOrderConsumer>();
config.UsingInMemory(ConfigureBus);
});
然后ConfigureBus
是:
private void ConfigureBus(IBusRegistrationContext context, IInMemoryBusFactoryConfigurator configurator)
{
configurator.ConfigureEndpoints(context);
}
我在其他地方找不到这个,因此张贴在这里。