在不将 MassTransit 添加到依赖项注入容器的情况下添加消费者

Add Consumers without adding MassTransit to a dependency injection container

我正在尝试实现与传输类型(即 Azure 服务总线、RabbitMQ 等)无关的 MassTransit 实现。我已经将逻辑与 DI 容器分离,因为 DI 容器将接口的范围添加到具体 class 并有一个工厂来根据配置确定传输。这适用于发布和发送,但是当我尝试执行 request/response 或消费时,创建了 RabbitMq 的 exchanges/queues 和 Azure 服务总线的主题和队列,但没有收到任何消息。

我的想法是,我可以采用 class 库,该库执行所有逻辑并在项目之间进行移植,而无需所有消费者的大量片段,因为这些消费者可能因项目而异项目。

我觉得我好像错过了一些重要的东西。在使用 AddMassTransit 将总线配置添加到 DI 容器后,如果不执行 .AddConsumer 到 DI 容器,这将无法工作,这与我正在尝试的相反。

我构建了一个集成测试项目来尝试 TDD,同时在添加到服务之前解决此功能并在此处被阻止。响应变量从不 returns 并且我最终超时。集成项目仅依赖于 MassTransit class 库和配置 class 库,实际上是直接与交通工具对话(而不是模拟它)。

我是关闭了还是需要放弃这个任务?

我当前项目的代码:

Startup.cs

        Configuration.IConfigurationProvider cfg = new ServiceFabricConfigurationProvider();
        switch (cfg.MessageService.MessagingService)
        {
            case "AzureServiceBus":
                services.AddScoped<IMassTransitTransport, MassTransitAzureServiceBusTransport>();
                break;
            case "RabbitMq":
                services.AddScoped<IMassTransitTransport, MassTransitRabbitMqTransport>();
                break;
            default:
                throw new ArgumentException("Invalid message service");
        };

        services.AddScoped<IMessagingService, MassTransitMessagingService>();

传输 class 它们非常相似,因为它们有一个共同的界面 - 下面显示了两者

IMassTransitTransport

public interface IMassTransitTransport
{
    IBusControl BusControl { get; }
}

MassTransitAzureServiceBusTransport

public sealed class MassTransitAzureServiceBusTransport : IMassTransitTransport
{
    readonly IConfigurationProvider configProvider;

    public MassTransitAzureServiceBusTransport(IConfigurationProvider configProvider)
    {
        this.configProvider = configProvider;
        BusControl = ConfigureBus();
        BusControl.StartAsync();
    }

    public IBusControl BusControl { get; }

    IBusControl ConfigureBus()
    {
        return Bus.Factory.CreateUsingAzureServiceBus(cfg => 
        {
            cfg.Host(configProvider.AzureServiceBus.AzureServiceBusConnectionString);

            cfg.ReceiveEndpoint("MyQueue", e => 
            {
                e.Consumer<ConsumerClass>();
            });
        });
    }

MassTransitRabbitMqTransport

public sealed class MassTransitRabbitMqTransport : IMassTransitTransport
{
    readonly IConfigurationProvider configProvider;

    public MassTransitRabbitMqTransport(IConfigurationProvider configProvider)
    {
        this.configProvider = configProvider;
        BusControl = ConfigureBus();
        BusControl.StartAsync();
    }

    public IBusControl BusControl { get; }

    IBusControl ConfigureBus()
    {
        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {

            cfg.Host(new Uri(configProvider.Rabbit.HostAddress), host =>
            {
                host.Username(configProvider.Rabbit.Username);
                host.Password(configProvider.Rabbit.Password);
            });

            cfg.ReceiveEndpoint("MyQueue", e => 
            {
                e.Consumer<ConsumerClass>();
            });
        });
    }
}

消息服务

public interface IMessagingService
{
    Task Publish<T>(object payload) where T : class;
    Task Send<T>(object payload) where T : class;
}

public class MassTransitMessagingService : IMessagingService
{
    readonly IMassTransitTransport massTransitTransport;

    public MassTransitMessagingService(IMassTransitTransport massTransitTransport)
    {
        //transport bus config already happens in massTransitTransport constructor
        this.massTransitTransport = massTransitTransport;
    }

    public async Task Publish<T>(object payload) where T : class
    {
        await massTransitTransport.BusControl.Publish<T>(payload);
    }

    public async Task Send<T>(object payload) where T : class
    {
        var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(new Uri(massTransitTransport.BusControl.Address, typeof(T).ToString()));
        await endpoint.Send<T>(payload);
    }
}

请求/响应接口和classes

public interface IEventRequest
{
    Guid EventGuid { get; set; }
    string Message { get; set; }
}

public interface IEventResponse
{
    Guid EventGuid { get; set; }
    string RequestMessage { get; set; }
    string ResponseMessage { get; set; }
}

public class ConsumerClass : IConsumer<IEventResponse>
{
    public async Task Consume(ConsumeContext<IEventResponse> context)
    {
        var payloadResponse = new
        {
            context.Message.EventGuid,
            context.Message.RequestMessage,
            ResponseMessage = "This is the response message;"
        };

        await context.RespondAsync(payloadResponse);
    }
}

执行一个request/response

的测试方法
    [Test]
    public async Task SendAndReceiveMessage()
    {
        // arrange
        var config = GetConfiguration();
        var transport = new MassTransitRabbitMqTransport(config);

        var payload = new
        {
            EventGuid = Guid.NewGuid(),
            Message = "This is an event message"
        };

        var clientFactory = transport.BusControl.CreateClientFactory();
        var client = clientFactory.CreateRequestClient<IEventRequest>();
        var response = await client.GetResponse<ConsumerClass>(payload);

    }

更新 1

根据 Chris Patterson 的反馈,我进行了以下修改。我相信我通过展示来自初创公司 class 的片段引起了混乱。

这里确实发生了两件事:一个 API 项目,其中包含上面的部分和下面的重构代码片段,以及一个集成测试项目。

就集成测试项目而言,真正使用的库只有3个class。集成测试项目不包含 DI 容器,因为它是一个测试项目,我想看看是否可以将逻辑与 DI 容器解耦。另外,我在集成测试项目中没有 ILogger。

整个解决方案是一个 API 项目,它有一个带有内置 DI 容器的启动器和一个实现发布的控制器。我的尝试是将 MassTransit 逻辑与 DI 容器分离,以便 MassTransitTransport 项目可以在其他地方使用,因此可以使用 MassTransit 支持的任何传输。我的问题是,就消费消息而言,这是否是一个坏主意(即,除非使用 DI 容器才能完成),或者是否可以完成。如果可以,那我是什么missing/what我做错了吗?

配置 - 利用 IConfigurationProvider 公共交通运输 - 包含消费者Class、IEventRequest、IEventResponse、EventResponse, IMassTransitTransport,MassTransitTransportRabbitMqTransport, MassTransitZaureServiceBusTransport, IMessagingService, 公共交通信息服务 MassTransitTransport.Integration - 包含 MassTransitMessagingServiceTests, NonServiceFabricConfigurationProvider

创建了 EventResponse 并在消费者中使用它Class

public class EventResponse : IEventResponse
{
    public Guid EventGuid { get; set; }
    public string RequestMessage { get; set; }
    public string ResponseMessage { get; set; }
}

消费者Class

public class ConsumerClass : IConsumer<IEventResponse>
{
    public async Task Consume(ConsumeContext<IEventResponse> context)
    {
        var payloadResponse = new EventResponse()
        {
            EventGuid = context.Message.EventGuid,
            RequestMessage = context.Message.RequestMessage,
            ResponseMessage = "This is the response message;"
        };

        await context.RespondAsync(payloadResponse);
    }
}

集成测试项目

    [Test]
    public async Task SendAndReceiveMessage()
    {
        // arrange
        var config = GetConfiguration();
        var transport = new MassTransitRabbitMqTransport(config);
        //var transport = new MassTransitAzureServiceBusTransport(config);

        var payload = new
        {
            EventGuid = Guid.NewGuid(),
            Message = "This is an event message"
        };

        var clientFactory = transport.BusControl.CreateClientFactory();
        var client = clientFactory.CreateRequestClient<IEventRequest>();
        var response = await client.GetResponse<IEventResponse>(payload);
    }

收到错误消息

MassTransit.RequestTimeoutException : 等待响应超时,RequestId: 7a000000-9a3c-0005-8037-08d80882f498

我将在评论中添加答案以关闭此问题:

有 2 个问题。第一个是 StartAsync 而不是 Start。第二个是 ConsumerClass 没有正确实现。它应该继承自 IConsumer 并具有方法 public async Task Consume(ConsumeContext context)。之后,一切正常。