EasyNetQ 中的 AutoSubscriber 不工作 - RabbitMQ .NET
AutoSubscriber in EasyNetQ does not working - RabbitMQ .NET
基于此 我正在尝试为我的消息创建 EasyNetQ Dispatcher。出于某种原因,当我的消息出现在队列中时,我的消费者没有被触发,我不知道可能是什么原因。
public class Program
{
static void Main(string[] args)
{
var config = LoadConfiguration();
ConfigureServices(config);
Console.ReadLine();
}
public static IConfiguration LoadConfiguration()
{
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
return builder.Build();
}
private static void ConfigureServices(IConfiguration config)
{
var services = new ServiceCollection()
.AddSingleton(config)
.AddEasyNetQ("host=127.0.0.1:5672;username=guest;password=guest")
.AddSingleton<AutoSubscriber>(provider =>
{
// When I put breakpoint below - is never reached. Is that correct behavior?
var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "SomePrefix")
{
AutoSubscriberMessageDispatcher = provider.GetRequiredService<IAutoSubscriberMessageDispatcher>()
};
subscriber.Subscribe(new Assembly[] { Assembly.GetExecutingAssembly() });
subscriber.SubscribeAsync(new Assembly[] { Assembly.GetExecutingAssembly() });
return subscriber;
});
services.BuildServiceProvider();
}
}
下面是嵌套代码的其余部分,尽管它看起来工作正常 - 所以问题可能出在 Program.cs
EasyNetQExtension
public static class EasyNetQExtension
{
private static void InternalInitEasyNetQ(IServiceCollection service, string rabbitMqConnection)
{
service.AddSingleton(RabbitHutch.CreateBus(rabbitMqConnection));
service.AddSingleton<IAutoSubscriberMessageDispatcher, ConsumerMessageDispatcher>(serviceProvider => new ConsumerMessageDispatcher(serviceProvider));
var consumerTypes = Assembly.GetExecutingAssembly().GetTypes()
.Where(x => x.IsClass && !x.IsAbstract && !x.IsInterface)
.Where(x => x.GetInterfaces().Any(t => t.Name == typeof(IConsume<>).Name));
foreach (var consumerType in consumerTypes)
{
service.AddTransient(consumerType);
}
// My consumer is found here, so this works properly
var consumerAsyncTypes = Assembly.GetExecutingAssembly().GetTypes()
.Where(x => x.IsClass && !x.IsAbstract && !x.IsInterface)
.Where(x => x.GetInterfaces().Any(t => t.Name == typeof(IConsumeAsync<>).Name));
foreach (var consumerAsyncType in consumerAsyncTypes)
{
service.AddTransient(consumerAsyncType);
}
}
public static IServiceCollection AddEasyNetQ(this IServiceCollection service, string rabbitMqConnectionString)
{
InternalInitEasyNetQ(service, rabbitMqConnectionString);
return service;
}
}
ConsumerMessageDispatcher
public class ConsumerMessageDispatcher : IAutoSubscriberMessageDispatcher
{
private readonly IServiceProvider _serviceProvider;
public ConsumerMessageDispatcher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public void Dispatch<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken = new CancellationToken()) where TMessage : class where TConsumer : class, IConsume<TMessage>
{
try
{
TConsumer consumer = _serviceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}
catch (Exception exception)
{
throw;
}
}
public async Task DispatchAsync<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken = new CancellationToken()) where TMessage : class where TConsumer : class, IConsumeAsync<TMessage>
{
try
{
TConsumer consumer = _serviceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
catch (Exception exception)
{
throw;
}
}
}
首先,您必须在控制台应用程序中实现 IConsumer<>
接口,以便可以通过 InternalInitEasyNetQ
方法进行注册。
using EasyNetQ.AutoSubscribe;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
namespace MyApp
{
public class ConsumeTextMessage : IConsume<string>
{
private readonly ILogger _logger;
public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
{
_logger = logger;
}
public void Consume(string message, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Logging the message: " + message);
Console.WriteLine("Reading the message: " + message);
}
}
}
其次,您缺少使用 IAutoSubscriberMessageDispatcher
编写消息的部分。您可以使用 IServiceProvider
或依赖注入来解析 IAutoSubscriberMessageDispatcher
接口。沿着这些线的东西:
var dispatcher = _provider.GetRequiredService<IAutoSubscriberMessageDispatcher>();
dispatcher.Dispatch<string, ConsumeTextMessage>("Dispatch my message - " + DateTime.Now);
我通过将 Autosubscriber 移动到单独的方法来解决这个问题,因此看起来是正确的:
public class Program
{
static void Main(string[] args)
{
var config = LoadConfiguration();
var provider = ConfigureServices(config);
ConfigureConsumers(provider);
Console.ReadLine();
}
public static IConfiguration LoadConfiguration()
{
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
return builder.Build();
}
private static ServiceProvider ConfigureServices(IConfiguration configuration)
{
var services = new ServiceCollection()
.AddTransient<IEmailSender, EmailSender>()
.Configure<AuthMessageSenderOptions>(options => configuration.GetSection("SendGridEmailSettings").Bind(options))
.AddEasyNetQ(configuration["QueueConnectionData"]);
return services.BuildServiceProvider();
}
private static void ConfigureConsumers(ServiceProvider provider)
{
var autoSubscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "SomePrefix")
{
AutoSubscriberMessageDispatcher = provider.GetRequiredService<IAutoSubscriberMessageDispatcher>()
};
autoSubscriber.Subscribe(new[] { Assembly.GetExecutingAssembly() });
autoSubscriber.SubscribeAsync(new[] { Assembly.GetExecutingAssembly() });
}
}
基于此
public class Program
{
static void Main(string[] args)
{
var config = LoadConfiguration();
ConfigureServices(config);
Console.ReadLine();
}
public static IConfiguration LoadConfiguration()
{
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
return builder.Build();
}
private static void ConfigureServices(IConfiguration config)
{
var services = new ServiceCollection()
.AddSingleton(config)
.AddEasyNetQ("host=127.0.0.1:5672;username=guest;password=guest")
.AddSingleton<AutoSubscriber>(provider =>
{
// When I put breakpoint below - is never reached. Is that correct behavior?
var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "SomePrefix")
{
AutoSubscriberMessageDispatcher = provider.GetRequiredService<IAutoSubscriberMessageDispatcher>()
};
subscriber.Subscribe(new Assembly[] { Assembly.GetExecutingAssembly() });
subscriber.SubscribeAsync(new Assembly[] { Assembly.GetExecutingAssembly() });
return subscriber;
});
services.BuildServiceProvider();
}
}
下面是嵌套代码的其余部分,尽管它看起来工作正常 - 所以问题可能出在 Program.cs
EasyNetQExtension
public static class EasyNetQExtension
{
private static void InternalInitEasyNetQ(IServiceCollection service, string rabbitMqConnection)
{
service.AddSingleton(RabbitHutch.CreateBus(rabbitMqConnection));
service.AddSingleton<IAutoSubscriberMessageDispatcher, ConsumerMessageDispatcher>(serviceProvider => new ConsumerMessageDispatcher(serviceProvider));
var consumerTypes = Assembly.GetExecutingAssembly().GetTypes()
.Where(x => x.IsClass && !x.IsAbstract && !x.IsInterface)
.Where(x => x.GetInterfaces().Any(t => t.Name == typeof(IConsume<>).Name));
foreach (var consumerType in consumerTypes)
{
service.AddTransient(consumerType);
}
// My consumer is found here, so this works properly
var consumerAsyncTypes = Assembly.GetExecutingAssembly().GetTypes()
.Where(x => x.IsClass && !x.IsAbstract && !x.IsInterface)
.Where(x => x.GetInterfaces().Any(t => t.Name == typeof(IConsumeAsync<>).Name));
foreach (var consumerAsyncType in consumerAsyncTypes)
{
service.AddTransient(consumerAsyncType);
}
}
public static IServiceCollection AddEasyNetQ(this IServiceCollection service, string rabbitMqConnectionString)
{
InternalInitEasyNetQ(service, rabbitMqConnectionString);
return service;
}
}
ConsumerMessageDispatcher
public class ConsumerMessageDispatcher : IAutoSubscriberMessageDispatcher
{
private readonly IServiceProvider _serviceProvider;
public ConsumerMessageDispatcher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public void Dispatch<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken = new CancellationToken()) where TMessage : class where TConsumer : class, IConsume<TMessage>
{
try
{
TConsumer consumer = _serviceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}
catch (Exception exception)
{
throw;
}
}
public async Task DispatchAsync<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken = new CancellationToken()) where TMessage : class where TConsumer : class, IConsumeAsync<TMessage>
{
try
{
TConsumer consumer = _serviceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
catch (Exception exception)
{
throw;
}
}
}
首先,您必须在控制台应用程序中实现 IConsumer<>
接口,以便可以通过 InternalInitEasyNetQ
方法进行注册。
using EasyNetQ.AutoSubscribe;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
namespace MyApp
{
public class ConsumeTextMessage : IConsume<string>
{
private readonly ILogger _logger;
public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
{
_logger = logger;
}
public void Consume(string message, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Logging the message: " + message);
Console.WriteLine("Reading the message: " + message);
}
}
}
其次,您缺少使用 IAutoSubscriberMessageDispatcher
编写消息的部分。您可以使用 IServiceProvider
或依赖注入来解析 IAutoSubscriberMessageDispatcher
接口。沿着这些线的东西:
var dispatcher = _provider.GetRequiredService<IAutoSubscriberMessageDispatcher>();
dispatcher.Dispatch<string, ConsumeTextMessage>("Dispatch my message - " + DateTime.Now);
我通过将 Autosubscriber 移动到单独的方法来解决这个问题,因此看起来是正确的:
public class Program
{
static void Main(string[] args)
{
var config = LoadConfiguration();
var provider = ConfigureServices(config);
ConfigureConsumers(provider);
Console.ReadLine();
}
public static IConfiguration LoadConfiguration()
{
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
return builder.Build();
}
private static ServiceProvider ConfigureServices(IConfiguration configuration)
{
var services = new ServiceCollection()
.AddTransient<IEmailSender, EmailSender>()
.Configure<AuthMessageSenderOptions>(options => configuration.GetSection("SendGridEmailSettings").Bind(options))
.AddEasyNetQ(configuration["QueueConnectionData"]);
return services.BuildServiceProvider();
}
private static void ConfigureConsumers(ServiceProvider provider)
{
var autoSubscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "SomePrefix")
{
AutoSubscriberMessageDispatcher = provider.GetRequiredService<IAutoSubscriberMessageDispatcher>()
};
autoSubscriber.Subscribe(new[] { Assembly.GetExecutingAssembly() });
autoSubscriber.SubscribeAsync(new[] { Assembly.GetExecutingAssembly() });
}
}