消息处理程序未激活

Message handler not activating

我 运行 遇到了一些关于 rebus 的问题。

这是我的场景。 我们有三项服务

身份)发表'IdentityCreated'留言

网关) 将 'UpdateProfileCommand' 直接发送到 'profile-westeu-input' 队列

Profile) 使用来自输入队列 'profile-westeu-input' 的消息并订阅 'IdentityCreated' 消息

Profile Service

中看到的rebus配置

因为我已经在温莎城堡注册了我的管理员。

container.Register(Classes.FromThisAssembly()
                  .BasedOn(typeof(IHandleMessages<>))
                  .WithServiceAllInterfaces()
                  .LifestyleTransient());

我用

配置了 Rebus
var bus = Configure.With(new CastleWindsorContainerAdapter(container))
            .Logging(x => x.Trace())
            .Transport(
                t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
                        inputQueueAddress: ProfileInputQueueName, mode: AzureServiceBusMode.Standard))
            .Options(o => o.SimpleRetryStrategy(ProfileErrorQueueName))
            .Start();

并订阅了这样的消息类型

bus.Subscribe(typeof(Nabufit.Messages.Identity.Events.IdentityCreated)).Wait()

我预计我的处理程序会被自动调用。然而它没有:(。

我尝试过不同的解决方案

奖金信息:

在调查该应用程序后,我们发现我们在 OwinCommunicationListener 中的 Webapi 之间共享了 Windsor 容器,它具有一些自定义的依赖生命周期配置。这导致了两个不同的错误。

  1. Rebus 不接收事件,因为容器配置
  2. 从架构上看,与消费进程共享同一个容器并不明智

我们最终使用 Rebus 提供的 BuiltinHandlerActivation class 构建了一个特定于总线使用过程的自定义 ICommunicationListener。看起来像这样。

 public class ServiceBusCommunicationListener : ICommunicationListener
{
    private BuiltinHandlerActivator activator;

    public async Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        activator = new BuiltinHandlerActivator();
        RegisterHandlers(activator);

        var connectionString = "...";
        var bus = Configure.With(activator)
            .Logging(x => x.Serilog(Log.Logger))
            .Transport(
                t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
                        inputQueueAddress: "input", mode: AzureServiceBusMode.Standard))
            .Options(o => o.SimpleRetryStrategy("error"))
            .Start();

        return connectionString;
    }

    private void RegisterHandlers(BuiltinHandlerActivator builtinHandlerActivator)
    {
        (...)
    }

    public async Task CloseAsync(CancellationToken cancellationToken)
    {
        if (activator != null)
            activator.Dispose();
    }

    public void Abort()
    {
        if (activator != null)
            activator.Dispose();
    }
}

并将 ServicebusCommunicationListner 注册为 ServiceInstanceListener。

internal sealed class ProfileService : StatelessService
{
    public ProfileService(StatelessServiceContext context)
        : base(context)
    { }

    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        return new[]
        {
            new ServiceInstanceListener(context => new ServiceBusCommunicationListener()), 
        };
    }
}