在不将 MassTransit 添加到依赖项注入容器的情况下添加消费者
Add Consumers without adding MassTransit to a dependency injection container
我正在尝试实现与传输类型(即 Azure 服务总线、RabbitMQ 等)无关的 MassTransit 实现。我已经将逻辑与 DI 容器分离,因为 DI 容器将接口的范围添加到具体 class 并有一个工厂来根据配置确定传输。这适用于发布和发送,但是当我尝试执行 request/response 或消费时,创建了 RabbitMq 的 exchanges/queues 和 Azure 服务总线的主题和队列,但没有收到任何消息。
我的想法是,我可以采用 class 库,该库执行所有逻辑并在项目之间进行移植,而无需所有消费者的大量片段,因为这些消费者可能因项目而异项目。
我觉得我好像错过了一些重要的东西。在使用 AddMassTransit 将总线配置添加到 DI 容器后,如果不执行 .AddConsumer 到 DI 容器,这将无法工作,这与我正在尝试的相反。
我构建了一个集成测试项目来尝试 TDD,同时在添加到服务之前解决此功能并在此处被阻止。响应变量从不 returns 并且我最终超时。集成项目仅依赖于 MassTransit class 库和配置 class 库,实际上是直接与交通工具对话(而不是模拟它)。
我是关闭了还是需要放弃这个任务?
我当前项目的代码:
Startup.cs
Configuration.IConfigurationProvider cfg = new ServiceFabricConfigurationProvider();
switch (cfg.MessageService.MessagingService)
{
case "AzureServiceBus":
services.AddScoped<IMassTransitTransport, MassTransitAzureServiceBusTransport>();
break;
case "RabbitMq":
services.AddScoped<IMassTransitTransport, MassTransitRabbitMqTransport>();
break;
default:
throw new ArgumentException("Invalid message service");
};
services.AddScoped<IMessagingService, MassTransitMessagingService>();
传输 class 它们非常相似,因为它们有一个共同的界面 - 下面显示了两者
IMassTransitTransport
public interface IMassTransitTransport
{
IBusControl BusControl { get; }
}
MassTransitAzureServiceBusTransport
public sealed class MassTransitAzureServiceBusTransport : IMassTransitTransport
{
readonly IConfigurationProvider configProvider;
public MassTransitAzureServiceBusTransport(IConfigurationProvider configProvider)
{
this.configProvider = configProvider;
BusControl = ConfigureBus();
BusControl.StartAsync();
}
public IBusControl BusControl { get; }
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
cfg.Host(configProvider.AzureServiceBus.AzureServiceBusConnectionString);
cfg.ReceiveEndpoint("MyQueue", e =>
{
e.Consumer<ConsumerClass>();
});
});
}
MassTransitRabbitMqTransport
public sealed class MassTransitRabbitMqTransport : IMassTransitTransport
{
readonly IConfigurationProvider configProvider;
public MassTransitRabbitMqTransport(IConfigurationProvider configProvider)
{
this.configProvider = configProvider;
BusControl = ConfigureBus();
BusControl.StartAsync();
}
public IBusControl BusControl { get; }
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri(configProvider.Rabbit.HostAddress), host =>
{
host.Username(configProvider.Rabbit.Username);
host.Password(configProvider.Rabbit.Password);
});
cfg.ReceiveEndpoint("MyQueue", e =>
{
e.Consumer<ConsumerClass>();
});
});
}
}
消息服务
public interface IMessagingService
{
Task Publish<T>(object payload) where T : class;
Task Send<T>(object payload) where T : class;
}
public class MassTransitMessagingService : IMessagingService
{
readonly IMassTransitTransport massTransitTransport;
public MassTransitMessagingService(IMassTransitTransport massTransitTransport)
{
//transport bus config already happens in massTransitTransport constructor
this.massTransitTransport = massTransitTransport;
}
public async Task Publish<T>(object payload) where T : class
{
await massTransitTransport.BusControl.Publish<T>(payload);
}
public async Task Send<T>(object payload) where T : class
{
var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(new Uri(massTransitTransport.BusControl.Address, typeof(T).ToString()));
await endpoint.Send<T>(payload);
}
}
请求/响应接口和classes
public interface IEventRequest
{
Guid EventGuid { get; set; }
string Message { get; set; }
}
public interface IEventResponse
{
Guid EventGuid { get; set; }
string RequestMessage { get; set; }
string ResponseMessage { get; set; }
}
public class ConsumerClass : IConsumer<IEventResponse>
{
public async Task Consume(ConsumeContext<IEventResponse> context)
{
var payloadResponse = new
{
context.Message.EventGuid,
context.Message.RequestMessage,
ResponseMessage = "This is the response message;"
};
await context.RespondAsync(payloadResponse);
}
}
执行一个request/response
的测试方法
[Test]
public async Task SendAndReceiveMessage()
{
// arrange
var config = GetConfiguration();
var transport = new MassTransitRabbitMqTransport(config);
var payload = new
{
EventGuid = Guid.NewGuid(),
Message = "This is an event message"
};
var clientFactory = transport.BusControl.CreateClientFactory();
var client = clientFactory.CreateRequestClient<IEventRequest>();
var response = await client.GetResponse<ConsumerClass>(payload);
}
更新 1
根据 Chris Patterson 的反馈,我进行了以下修改。我相信我通过展示来自初创公司 class 的片段引起了混乱。
这里确实发生了两件事:一个 API 项目,其中包含上面的部分和下面的重构代码片段,以及一个集成测试项目。
就集成测试项目而言,真正使用的库只有3个class。集成测试项目不包含 DI 容器,因为它是一个测试项目,我想看看是否可以将逻辑与 DI 容器解耦。另外,我在集成测试项目中没有 ILogger。
整个解决方案是一个 API 项目,它有一个带有内置 DI 容器的启动器和一个实现发布的控制器。我的尝试是将 MassTransit 逻辑与 DI 容器分离,以便 MassTransitTransport 项目可以在其他地方使用,因此可以使用 MassTransit 支持的任何传输。我的问题是,就消费消息而言,这是否是一个坏主意(即,除非使用 DI 容器才能完成),或者是否可以完成。如果可以,那我是什么missing/what我做错了吗?
配置
- 利用 IConfigurationProvider
公共交通运输
- 包含消费者Class、IEventRequest、IEventResponse、EventResponse,
IMassTransitTransport,MassTransitTransportRabbitMqTransport,
MassTransitZaureServiceBusTransport, IMessagingService,
公共交通信息服务
MassTransitTransport.Integration
- 包含 MassTransitMessagingServiceTests,
NonServiceFabricConfigurationProvider
创建了 EventResponse 并在消费者中使用它Class
public class EventResponse : IEventResponse
{
public Guid EventGuid { get; set; }
public string RequestMessage { get; set; }
public string ResponseMessage { get; set; }
}
消费者Class
public class ConsumerClass : IConsumer<IEventResponse>
{
public async Task Consume(ConsumeContext<IEventResponse> context)
{
var payloadResponse = new EventResponse()
{
EventGuid = context.Message.EventGuid,
RequestMessage = context.Message.RequestMessage,
ResponseMessage = "This is the response message;"
};
await context.RespondAsync(payloadResponse);
}
}
集成测试项目
[Test]
public async Task SendAndReceiveMessage()
{
// arrange
var config = GetConfiguration();
var transport = new MassTransitRabbitMqTransport(config);
//var transport = new MassTransitAzureServiceBusTransport(config);
var payload = new
{
EventGuid = Guid.NewGuid(),
Message = "This is an event message"
};
var clientFactory = transport.BusControl.CreateClientFactory();
var client = clientFactory.CreateRequestClient<IEventRequest>();
var response = await client.GetResponse<IEventResponse>(payload);
}
收到错误消息
MassTransit.RequestTimeoutException : 等待响应超时,RequestId: 7a000000-9a3c-0005-8037-08d80882f498
我将在评论中添加答案以关闭此问题:
有 2 个问题。第一个是 StartAsync 而不是 Start。第二个是 ConsumerClass 没有正确实现。它应该继承自 IConsumer 并具有方法 public async Task Consume(ConsumeContext context)。之后,一切正常。
我正在尝试实现与传输类型(即 Azure 服务总线、RabbitMQ 等)无关的 MassTransit 实现。我已经将逻辑与 DI 容器分离,因为 DI 容器将接口的范围添加到具体 class 并有一个工厂来根据配置确定传输。这适用于发布和发送,但是当我尝试执行 request/response 或消费时,创建了 RabbitMq 的 exchanges/queues 和 Azure 服务总线的主题和队列,但没有收到任何消息。
我的想法是,我可以采用 class 库,该库执行所有逻辑并在项目之间进行移植,而无需所有消费者的大量片段,因为这些消费者可能因项目而异项目。
我觉得我好像错过了一些重要的东西。在使用 AddMassTransit 将总线配置添加到 DI 容器后,如果不执行 .AddConsumer 到 DI 容器,这将无法工作,这与我正在尝试的相反。
我构建了一个集成测试项目来尝试 TDD,同时在添加到服务之前解决此功能并在此处被阻止。响应变量从不 returns 并且我最终超时。集成项目仅依赖于 MassTransit class 库和配置 class 库,实际上是直接与交通工具对话(而不是模拟它)。
我是关闭了还是需要放弃这个任务?
我当前项目的代码:
Startup.cs
Configuration.IConfigurationProvider cfg = new ServiceFabricConfigurationProvider();
switch (cfg.MessageService.MessagingService)
{
case "AzureServiceBus":
services.AddScoped<IMassTransitTransport, MassTransitAzureServiceBusTransport>();
break;
case "RabbitMq":
services.AddScoped<IMassTransitTransport, MassTransitRabbitMqTransport>();
break;
default:
throw new ArgumentException("Invalid message service");
};
services.AddScoped<IMessagingService, MassTransitMessagingService>();
传输 class 它们非常相似,因为它们有一个共同的界面 - 下面显示了两者
IMassTransitTransport
public interface IMassTransitTransport
{
IBusControl BusControl { get; }
}
MassTransitAzureServiceBusTransport
public sealed class MassTransitAzureServiceBusTransport : IMassTransitTransport
{
readonly IConfigurationProvider configProvider;
public MassTransitAzureServiceBusTransport(IConfigurationProvider configProvider)
{
this.configProvider = configProvider;
BusControl = ConfigureBus();
BusControl.StartAsync();
}
public IBusControl BusControl { get; }
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
cfg.Host(configProvider.AzureServiceBus.AzureServiceBusConnectionString);
cfg.ReceiveEndpoint("MyQueue", e =>
{
e.Consumer<ConsumerClass>();
});
});
}
MassTransitRabbitMqTransport
public sealed class MassTransitRabbitMqTransport : IMassTransitTransport
{
readonly IConfigurationProvider configProvider;
public MassTransitRabbitMqTransport(IConfigurationProvider configProvider)
{
this.configProvider = configProvider;
BusControl = ConfigureBus();
BusControl.StartAsync();
}
public IBusControl BusControl { get; }
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri(configProvider.Rabbit.HostAddress), host =>
{
host.Username(configProvider.Rabbit.Username);
host.Password(configProvider.Rabbit.Password);
});
cfg.ReceiveEndpoint("MyQueue", e =>
{
e.Consumer<ConsumerClass>();
});
});
}
}
消息服务
public interface IMessagingService
{
Task Publish<T>(object payload) where T : class;
Task Send<T>(object payload) where T : class;
}
public class MassTransitMessagingService : IMessagingService
{
readonly IMassTransitTransport massTransitTransport;
public MassTransitMessagingService(IMassTransitTransport massTransitTransport)
{
//transport bus config already happens in massTransitTransport constructor
this.massTransitTransport = massTransitTransport;
}
public async Task Publish<T>(object payload) where T : class
{
await massTransitTransport.BusControl.Publish<T>(payload);
}
public async Task Send<T>(object payload) where T : class
{
var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(new Uri(massTransitTransport.BusControl.Address, typeof(T).ToString()));
await endpoint.Send<T>(payload);
}
}
请求/响应接口和classes
public interface IEventRequest
{
Guid EventGuid { get; set; }
string Message { get; set; }
}
public interface IEventResponse
{
Guid EventGuid { get; set; }
string RequestMessage { get; set; }
string ResponseMessage { get; set; }
}
public class ConsumerClass : IConsumer<IEventResponse>
{
public async Task Consume(ConsumeContext<IEventResponse> context)
{
var payloadResponse = new
{
context.Message.EventGuid,
context.Message.RequestMessage,
ResponseMessage = "This is the response message;"
};
await context.RespondAsync(payloadResponse);
}
}
执行一个request/response
的测试方法 [Test]
public async Task SendAndReceiveMessage()
{
// arrange
var config = GetConfiguration();
var transport = new MassTransitRabbitMqTransport(config);
var payload = new
{
EventGuid = Guid.NewGuid(),
Message = "This is an event message"
};
var clientFactory = transport.BusControl.CreateClientFactory();
var client = clientFactory.CreateRequestClient<IEventRequest>();
var response = await client.GetResponse<ConsumerClass>(payload);
}
更新 1
根据 Chris Patterson 的反馈,我进行了以下修改。我相信我通过展示来自初创公司 class 的片段引起了混乱。
这里确实发生了两件事:一个 API 项目,其中包含上面的部分和下面的重构代码片段,以及一个集成测试项目。
就集成测试项目而言,真正使用的库只有3个class。集成测试项目不包含 DI 容器,因为它是一个测试项目,我想看看是否可以将逻辑与 DI 容器解耦。另外,我在集成测试项目中没有 ILogger。
整个解决方案是一个 API 项目,它有一个带有内置 DI 容器的启动器和一个实现发布的控制器。我的尝试是将 MassTransit 逻辑与 DI 容器分离,以便 MassTransitTransport 项目可以在其他地方使用,因此可以使用 MassTransit 支持的任何传输。我的问题是,就消费消息而言,这是否是一个坏主意(即,除非使用 DI 容器才能完成),或者是否可以完成。如果可以,那我是什么missing/what我做错了吗?
配置 - 利用 IConfigurationProvider 公共交通运输 - 包含消费者Class、IEventRequest、IEventResponse、EventResponse, IMassTransitTransport,MassTransitTransportRabbitMqTransport, MassTransitZaureServiceBusTransport, IMessagingService, 公共交通信息服务 MassTransitTransport.Integration - 包含 MassTransitMessagingServiceTests, NonServiceFabricConfigurationProvider
创建了 EventResponse 并在消费者中使用它Class
public class EventResponse : IEventResponse
{
public Guid EventGuid { get; set; }
public string RequestMessage { get; set; }
public string ResponseMessage { get; set; }
}
消费者Class
public class ConsumerClass : IConsumer<IEventResponse>
{
public async Task Consume(ConsumeContext<IEventResponse> context)
{
var payloadResponse = new EventResponse()
{
EventGuid = context.Message.EventGuid,
RequestMessage = context.Message.RequestMessage,
ResponseMessage = "This is the response message;"
};
await context.RespondAsync(payloadResponse);
}
}
集成测试项目
[Test]
public async Task SendAndReceiveMessage()
{
// arrange
var config = GetConfiguration();
var transport = new MassTransitRabbitMqTransport(config);
//var transport = new MassTransitAzureServiceBusTransport(config);
var payload = new
{
EventGuid = Guid.NewGuid(),
Message = "This is an event message"
};
var clientFactory = transport.BusControl.CreateClientFactory();
var client = clientFactory.CreateRequestClient<IEventRequest>();
var response = await client.GetResponse<IEventResponse>(payload);
}
收到错误消息
MassTransit.RequestTimeoutException : 等待响应超时,RequestId: 7a000000-9a3c-0005-8037-08d80882f498
我将在评论中添加答案以关闭此问题:
有 2 个问题。第一个是 StartAsync 而不是 Start。第二个是 ConsumerClass 没有正确实现。它应该继承自 IConsumer 并具有方法 public async Task Consume(ConsumeContext context)。之后,一切正常。