EasyNetQ 中的 AutoSubscriber 不工作 - RabbitMQ .NET

AutoSubscriber in EasyNetQ does not working - RabbitMQ .NET

基于此 我正在尝试为我的消息创建 EasyNetQ Dispatcher。出于某种原因,当我的消息出现在队列中时,我的消费者没有被触发,我不知道可能是什么原因。

public class Program
{
    static void Main(string[] args)
    {
        var config = LoadConfiguration();
        ConfigureServices(config);

        Console.ReadLine();
    }

    public static IConfiguration LoadConfiguration()
    {
        var builder = new ConfigurationBuilder()
            .SetBasePath(Directory.GetCurrentDirectory())
            .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);

        return builder.Build();
    }

    private static void ConfigureServices(IConfiguration config)
    {
        var services = new ServiceCollection()
                .AddSingleton(config)
                .AddEasyNetQ("host=127.0.0.1:5672;username=guest;password=guest")
                .AddSingleton<AutoSubscriber>(provider =>
                {
                    // When I put breakpoint below - is never reached. Is that correct behavior?
                    var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "SomePrefix")
                    {
                        AutoSubscriberMessageDispatcher = provider.GetRequiredService<IAutoSubscriberMessageDispatcher>()
                    };
                    subscriber.Subscribe(new Assembly[] { Assembly.GetExecutingAssembly() });
                    subscriber.SubscribeAsync(new Assembly[] { Assembly.GetExecutingAssembly() });
                    return subscriber;
                });

        services.BuildServiceProvider();
    }
}

下面是嵌套代码的其余部分,尽管它看起来工作正常 - 所以问题可能出在 Program.cs

EasyNetQExtension

public static class EasyNetQExtension
    {
        private static void InternalInitEasyNetQ(IServiceCollection service, string rabbitMqConnection)
        {
            service.AddSingleton(RabbitHutch.CreateBus(rabbitMqConnection));
            service.AddSingleton<IAutoSubscriberMessageDispatcher, ConsumerMessageDispatcher>(serviceProvider => new ConsumerMessageDispatcher(serviceProvider));

            var consumerTypes = Assembly.GetExecutingAssembly().GetTypes()
                .Where(x => x.IsClass && !x.IsAbstract && !x.IsInterface)
                .Where(x => x.GetInterfaces().Any(t => t.Name == typeof(IConsume<>).Name));
            foreach (var consumerType in consumerTypes)
            {
                service.AddTransient(consumerType);
            }
            
            // My consumer is found here, so this works properly
            var consumerAsyncTypes = Assembly.GetExecutingAssembly().GetTypes()
                .Where(x => x.IsClass && !x.IsAbstract && !x.IsInterface)
                .Where(x => x.GetInterfaces().Any(t => t.Name == typeof(IConsumeAsync<>).Name));
            foreach (var consumerAsyncType in consumerAsyncTypes)
            {
                service.AddTransient(consumerAsyncType);
            }
        }

        public static IServiceCollection AddEasyNetQ(this IServiceCollection service, string rabbitMqConnectionString)
        {
            InternalInitEasyNetQ(service, rabbitMqConnectionString);

            return service;
        }
    }

ConsumerMessageDispatcher

public class ConsumerMessageDispatcher : IAutoSubscriberMessageDispatcher
    {
        private readonly IServiceProvider _serviceProvider;

        public ConsumerMessageDispatcher(IServiceProvider serviceProvider)
        {
            _serviceProvider = serviceProvider;
        }

        public void Dispatch<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken = new CancellationToken()) where TMessage : class where TConsumer : class, IConsume<TMessage>
        {
            try
            {
                TConsumer consumer = _serviceProvider.GetRequiredService<TConsumer>();
                consumer.Consume(message);
            }
            catch (Exception exception)
            {
                throw;
            }
        }

        public async Task DispatchAsync<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken = new CancellationToken()) where TMessage : class where TConsumer : class, IConsumeAsync<TMessage>
        {
            try
            {
                TConsumer consumer = _serviceProvider.GetRequiredService<TConsumer>();
                await consumer.ConsumeAsync(message);
            }
            catch (Exception exception)
            {
                throw;
            }
        }
    }

首先,您必须在控制台应用程序中实现 IConsumer<> 接口,以便可以通过 InternalInitEasyNetQ 方法进行注册。

using EasyNetQ.AutoSubscribe;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;

namespace MyApp
{
    public class ConsumeTextMessage : IConsume<string>
    {
        private readonly ILogger _logger;

        public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
        {
            _logger = logger;
        }

        public void Consume(string message, CancellationToken cancellationToken = default)
        {
            _logger.LogInformation("Logging the message: " + message);
            Console.WriteLine("Reading the message: " + message);
        }
    }
}

其次,您缺少使用 IAutoSubscriberMessageDispatcher 编写消息的部分。您可以使用 IServiceProvider 或依赖注入来解析 IAutoSubscriberMessageDispatcher 接口。沿着这些线的东西:

var dispatcher = _provider.GetRequiredService<IAutoSubscriberMessageDispatcher>();
dispatcher.Dispatch<string, ConsumeTextMessage>("Dispatch my message - " + DateTime.Now);

我通过将 Autosubscriber 移动到单独的方法来解决这个问题,因此看起来是正确的:

public class Program
{
    static void Main(string[] args)
    {
        var config = LoadConfiguration();
        var provider = ConfigureServices(config);

        ConfigureConsumers(provider);

        Console.ReadLine();
    }

    public static IConfiguration LoadConfiguration()
    {
        var builder = new ConfigurationBuilder()
            .SetBasePath(Directory.GetCurrentDirectory())
            .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);

        return builder.Build();
    }

    private static ServiceProvider ConfigureServices(IConfiguration configuration)
    {
        var services = new ServiceCollection()
            .AddTransient<IEmailSender, EmailSender>()
            .Configure<AuthMessageSenderOptions>(options => configuration.GetSection("SendGridEmailSettings").Bind(options))
            .AddEasyNetQ(configuration["QueueConnectionData"]);

        return services.BuildServiceProvider();
    }

    private static void ConfigureConsumers(ServiceProvider provider)
    {
        var autoSubscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "SomePrefix")
        {
            AutoSubscriberMessageDispatcher = provider.GetRequiredService<IAutoSubscriberMessageDispatcher>()
        };

        autoSubscriber.Subscribe(new[] { Assembly.GetExecutingAssembly() });
        autoSubscriber.SubscribeAsync(new[] { Assembly.GetExecutingAssembly() });
    }
}