在 ASP.net Core 中注入多个 ServiceBusClient

Inject multiple ServiceBusClient in ASP.net Core

我想注入多个具有相同连接字符串但队列名称不同的 ServiceBusClient。

    _services.TryAddSingleton(implementationFactory =>
    {
        var serviceBusConfiguration = implementationFactory.GetRequiredService<Microsoft.Extensions.Options.IOptions<UserEventConsumerSetting>>().Value;
    
        var serviceBusClient = new ServiceBusClient(serviceBusConfiguration.ServiceBusConnectionString, new ServiceBusClientOptions
        {
            TransportType = ServiceBusTransportType.AmqpWebSockets
        });
    
        return serviceBusClient.CreateReceiver(serviceBusConfiguration.ServiceBusQueueName, new ServiceBusReceiverOptions
        {
            ReceiveMode = ServiceBusReceiveMode.PeekLock
        });
    });

为了使用,我必须创建 ServiceBusSender 实例。

    private readonly ServiceBusSender _serviceBusSender;
    
    public CarReservationMessagingService(ServiceBusSender serviceBusSender)
    {
        _serviceBusSender = serviceBusSender ?? throw new ArgumentNullException(nameof(serviceBusSender));
    }
    
    public async Task PublishNewCarReservationMessageAsync(CarReservation carReservation)
    {
        var carReservationIntegrationMessage = new CarReservationIntegrationMessage
        {
            Id = Guid.NewGuid().ToString(),
            CarId = carReservation.CarId,
            CustomerId = carReservation.CustomerId,
            RentFrom = carReservation.RentFrom,
            RentTo = carReservation.RentTo
        };
    
        var serializedMessage = JsonSerializer.Serialize(carReservationIntegrationMessage);
        ServiceBusMessage message = new ServiceBusMessage(serializedMessage);
        await _serviceBusSender.SendMessageAsync(message);
    }

如何注入多个 servicebusclient 并以不同方式使用它们?

因此,在其他 IoC 容器(如 Unity)中,可以指定命名依赖项。您在注册时指定名称,如下所示:

container.RegisterSingleto<SomeClassImplementation>("Name")

然后在class你使用的地方你应该用这个名字指定属性

[DependencyAttribute("Name")]

但是.net core中没有这个功能。 你可以使用统一。 或者你可以模仿这个功能。

让我们创建枚举(如果需要,也可以创建常量)

enum ServiceBusQueue { ServiceBus1, ServiceBus2, ServiceBus3}

然后创建服务 buc 提供者

public class ServiceBusQueueProvider
{
    private Dictionary<ServiceBusQueue, ServiceBusSender > _senders;

    public ServiceBusQueueProvider()
    {
        //Method to initialize all queues with all names and store it in _senders;
    }

    public ServiceBusSender Resolve(ServiceBusQueue queue)
    {
        return _senders[queue];
    }
}

然后注册这个provider,在其他需要获取不同队列的类中使用:

 _services.TryAddSingleton(implementationFactory => ServiceBusQueueProvider());

在构造函数中:

public SomeClass(ServiceBusQueueProvider provider)
{
    _serviceBusSender  = provider.Resolve(ServiceBusQueue.ServiceBus1);
}

还有一点是您应该更改 MessagingServiceConfiguration 此 class 映射到配置。在当前的实现中,您只能指定一个队列。 改成这样:

public class MessagingServiceConfiguration
{
    public List<MessagingServiceConfigurationItem> Queues { get; set; }
}

public class MessagingServiceConfigurationItem
{
    public string QueueName { get; set; }
    public string ListenAndSendConnectionString { get; set; }
}

您可以创建一个 class 如下所示的界面。这是一个非常基本的例子。

接口:

 public interface IAzureServiceBusConsumer
{
    Task Start();
    Task Stop();
}

实现:

public class AzureServiceBusConsumer : IAzureServiceBusConsumer
{
    //Service Bus processor 
    private ServiceBusProcessor _userAddedProcessor;        
    private ServiceBusProcessor _userUpdatedProcessor;

    private readonly IUserReceivedService _userReceived;
    private readonly IConfiguration _configuration;       


    private readonly string _serviceBusConnectionString;
    private readonly string _subscriptionUserAdded;
    private readonly string _subscriptionUserUpdated;
    private readonly string _UserAddedMessageTopic;
    

    public AzureServiceBusConsumer(IUserReceivedService userReceived ,IConfiguration configuration)
    {
        _userReceived = userReceived;
        _configuration = configuration;          

        _serviceBusConnectionString = _configuration.GetValue<string>("ServiceBusConnectionString");
        _UserAddedMessageTopic = _configuration.GetValue<string>("UserAddedMessageTopic");
        _subscriptionUserAdded = _configuration.GetValue<string>("UserAddedSubscription");                 
       

        //Create a client to receive messages
        var client = new ServiceBusClient(_serviceBusConnectionString);

        //on client call you can provide different subscription
        _userAddedProcessor = client.CreateProcessor(_UserAddedMessageTopic, _subscriptionUserAdded);
        _userUpdatedProcessor = client.CreateProcessor(_UserAddedMessageTopic, _subscriptionUserUpdated);
    }

    /// <summary>
    /// start the bus processor to look for any messages.
    /// </summary>
    /// <returns></returns>
    public async Task Start()
    {           

        _userAddedProcessor.ProcessMessageAsync += OnUserAddedMessageReceived;
        _userAddedProcessor.ProcessErrorAsync += ErrorHandler;
        await _userAddedProcessor.StartProcessingAsync();
        _userUpdatedProcessor.ProcessMessageAsync += OnUserUpdatedMessageReceived;
        _userUpdatedProcessor.ProcessErrorAsync += ErrorHandler;
        await _userUpdatedProcessor.StartProcessingAsync();
       
    }

    /// <summary>
    /// stop the bus processor and dispose conn.
    /// </summary>
    /// <returns></returns>
    public async Task Stop()
    {
        await _userAddedProcessor.StopProcessingAsync();
        await _userAddedProcessor.DisposeAsync();
        
    }
    Task ErrorHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine(args.Exception.ToString());
        return Task.CompletedTask;
    }

    private async Task OnUserAddedMessageReceived(ProcessMessageEventArgs args)
    {
        var message = args.Message;
        var body = Encoding.UTF8.GetString(message.Body);

        UserDto _UserRecivedDto = JsonConvert.DeserializeObject<UserDto>(body);

        //Save the recieved message then call CompleteMessageAsync
        await args.CompleteMessageAsync(args.Message);
    }

对于以后来这里的任何人,我建议使用 ServiceBusClientBuilderExtensions

public override void Configure(IFunctionsHostBuilder builder)
{
    builder.Services.AddAzureClients(clientsBuilder =>
    {
        clientsBuilder.AddServiceBusClient("Client1ConnectionString")
          .WithName("Client1Name");
        clientsBuilder.AddServiceBusClient("Client2ConnectionString")
          .WithName("Client2Name");
    });
}

为了检索它,您必须使用 IAzureClientFactory<ServiceBusClient>,如下所示:

 public Constructor(IAzureClientFactory<ServiceBusClient> serviceBusClientFactory)
 {
     ServiceBusClient client1 = serviceBusClientFactory.CreateClient("Client1Name")
 }

答案中提取和简化的代码片段。