如何在 .NET Core 中使用 RabbitMQ 和 MassTransit 正确使用消息?
How to properly consume message using RabbitMQ with MassTransit in .NET Core?
我正在微服务架构中实现应用程序。我想将某种事件存储附加到我的解决方案中。我不熟悉使用 RabbitMQ 进行消息传递,可能我配置不正确。让我们简化我的解决方案,想象一下我有一个简单的公共服务,它必须向总线广播一些消息。要注册代理依赖项,我使用以下代码:
internal static IServiceCollection RegisterRabbitMQDependencies(this IServiceCollection services,
IConfiguration configuration)
{
var rabbitMQSettings = configuration
.GetSection(RabbitMQSettingsSectionKey)
.Get<RabbitMQSettings>();
services
.AddSingleton(serviceProvider => MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
configurator
.Host(rabbitMQSettings.HostName,
rabbitMQSettings.VirtualHostName,
hostConfigurator =>
{
hostConfigurator.Username(rabbitMQSettings.UserName);
hostConfigurator.Password(rabbitMQSettings.Password);
});
configurator.ExchangeType = ExchangeType.Fanout;
}))
.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>())
.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>())
.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>())
.Configure<RabbitMQSettings>(configuration.GetSection(RabbitMQSettingsSectionKey));
return services;
}
由于我们的简化,它没有消费者 - 它只需要向总线发布一些消息。来自服务的消息已正确发布,因为我可以在 RabbitMq 管理系统中注意到它们。
问题是我的任何消费者都没有响应相应的消息(可能是由于某些配置错误)。回到实现——我有一个事件存储(不同的容器)。我的事件存储必须处理通过总线传输的所有集成事件。我已经在事件存储中配置了通信,如下所示:
public static IServiceCollection RegisterMessagingDependencies(this IServiceCollection services,
IConfiguration configuration) =>
services
.AddMassTransit(configurator =>
{
configurator.AddConsumer<EventHandler>();
})
.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("localhost", "/", h => {
h.Username("guest");
h.Password("guest");
});
}))
.RegisterEndpointsDependencies();
连接参数正确,我检查了几次。正如您所注意到的 - 我只有一个名为“EventHandler”的消费者,它必须处理所有 IntegrationEvents。在这里,我附上了 EventHandler class 的定义,它的父级:
public abstract class IntegrationEventHandler<TIntegrationEvent> : IIntegrationEventHandler<TIntegrationEvent>,
IConsumer<TIntegrationEvent> where TIntegrationEvent : class, IIntegrationEvent
{
public async Task Consume(ConsumeContext<TIntegrationEvent> context) =>
await HandleAsync(context.Message);
public abstract Task HandleAsync(TIntegrationEvent @event);
}
public sealed class EventHandler : IntegrationEventHandler<IIntegrationEvent>
{
private readonly IEventRepository _eventRepository;
public EventHandler(IEventRepository eventRepository)
{
_eventRepository = eventRepository;
}
public override async Task HandleAsync(IIntegrationEvent @event)
{
var readyToBeSavedEvent = new Event(@event);
await _eventRepository.CreateAsync(readyToBeSavedEvent);
await _eventRepository.SaveChangesAsync();
}
}
EventHandler base class 实现了 IConsumer 接口,所以我认为它应该以某种方式与总线上发生的所有集成事件相关联。但问题是 HandleAsync 方法从未被调用,尽管发布了一些事件。另外我附加了负责发布我的事件的代码:
public async Task PublishAsync(params IIntegrationEvent[] events)
{
var globalPublicationTasks = events
.Select(@event =>
{
@event.PublishedAtUtc = DateTime.UtcNow;
@event.JsonContent = JsonConvert.SerializeObject(@event);
return _publishEndpoint.Publish(@event);
});
await Task.WhenAll(globalPublicationTasks);
}
我错过了什么吗?我究竟做错了什么?我希望在总线上发布某些集成事件时调用 HandleAsync 方法。感谢您的帮助:)
您没有正确使用 AddMassTransit
。您也不要在消费者端启动总线。
全部properly documented,你只需要复制粘贴示例即可。
public void ConfigureServices(IServiceCollection services)
{
services.AddMassTransit(x =>
{
x.AddConsumer<EventConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ReceiveEndpoint("event-listener", e =>
{
e.ConfigureConsumer<EventConsumer>(context);
});
});
});
services.AddMassTransitHostedService();
}
我正在微服务架构中实现应用程序。我想将某种事件存储附加到我的解决方案中。我不熟悉使用 RabbitMQ 进行消息传递,可能我配置不正确。让我们简化我的解决方案,想象一下我有一个简单的公共服务,它必须向总线广播一些消息。要注册代理依赖项,我使用以下代码:
internal static IServiceCollection RegisterRabbitMQDependencies(this IServiceCollection services,
IConfiguration configuration)
{
var rabbitMQSettings = configuration
.GetSection(RabbitMQSettingsSectionKey)
.Get<RabbitMQSettings>();
services
.AddSingleton(serviceProvider => MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
configurator
.Host(rabbitMQSettings.HostName,
rabbitMQSettings.VirtualHostName,
hostConfigurator =>
{
hostConfigurator.Username(rabbitMQSettings.UserName);
hostConfigurator.Password(rabbitMQSettings.Password);
});
configurator.ExchangeType = ExchangeType.Fanout;
}))
.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>())
.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>())
.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>())
.Configure<RabbitMQSettings>(configuration.GetSection(RabbitMQSettingsSectionKey));
return services;
}
由于我们的简化,它没有消费者 - 它只需要向总线发布一些消息。来自服务的消息已正确发布,因为我可以在 RabbitMq 管理系统中注意到它们。 问题是我的任何消费者都没有响应相应的消息(可能是由于某些配置错误)。回到实现——我有一个事件存储(不同的容器)。我的事件存储必须处理通过总线传输的所有集成事件。我已经在事件存储中配置了通信,如下所示:
public static IServiceCollection RegisterMessagingDependencies(this IServiceCollection services,
IConfiguration configuration) =>
services
.AddMassTransit(configurator =>
{
configurator.AddConsumer<EventHandler>();
})
.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("localhost", "/", h => {
h.Username("guest");
h.Password("guest");
});
}))
.RegisterEndpointsDependencies();
连接参数正确,我检查了几次。正如您所注意到的 - 我只有一个名为“EventHandler”的消费者,它必须处理所有 IntegrationEvents。在这里,我附上了 EventHandler class 的定义,它的父级:
public abstract class IntegrationEventHandler<TIntegrationEvent> : IIntegrationEventHandler<TIntegrationEvent>,
IConsumer<TIntegrationEvent> where TIntegrationEvent : class, IIntegrationEvent
{
public async Task Consume(ConsumeContext<TIntegrationEvent> context) =>
await HandleAsync(context.Message);
public abstract Task HandleAsync(TIntegrationEvent @event);
}
public sealed class EventHandler : IntegrationEventHandler<IIntegrationEvent>
{
private readonly IEventRepository _eventRepository;
public EventHandler(IEventRepository eventRepository)
{
_eventRepository = eventRepository;
}
public override async Task HandleAsync(IIntegrationEvent @event)
{
var readyToBeSavedEvent = new Event(@event);
await _eventRepository.CreateAsync(readyToBeSavedEvent);
await _eventRepository.SaveChangesAsync();
}
}
EventHandler base class 实现了 IConsumer 接口,所以我认为它应该以某种方式与总线上发生的所有集成事件相关联。但问题是 HandleAsync 方法从未被调用,尽管发布了一些事件。另外我附加了负责发布我的事件的代码:
public async Task PublishAsync(params IIntegrationEvent[] events)
{
var globalPublicationTasks = events
.Select(@event =>
{
@event.PublishedAtUtc = DateTime.UtcNow;
@event.JsonContent = JsonConvert.SerializeObject(@event);
return _publishEndpoint.Publish(@event);
});
await Task.WhenAll(globalPublicationTasks);
}
我错过了什么吗?我究竟做错了什么?我希望在总线上发布某些集成事件时调用 HandleAsync 方法。感谢您的帮助:)
您没有正确使用 AddMassTransit
。您也不要在消费者端启动总线。
全部properly documented,你只需要复制粘贴示例即可。
public void ConfigureServices(IServiceCollection services)
{
services.AddMassTransit(x =>
{
x.AddConsumer<EventConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ReceiveEndpoint("event-listener", e =>
{
e.ConfigureConsumer<EventConsumer>(context);
});
});
});
services.AddMassTransitHostedService();
}