消息处理程序未激活
Message handler not activating
我 运行 遇到了一些关于 rebus 的问题。
这是我的场景。
我们有三项服务
身份)发表'IdentityCreated'留言
网关) 将 'UpdateProfileCommand' 直接发送到 'profile-westeu-input' 队列
Profile) 使用来自输入队列 'profile-westeu-input' 的消息并订阅 'IdentityCreated' 消息
在Profile Service
中看到的rebus配置
因为我已经在温莎城堡注册了我的管理员。
container.Register(Classes.FromThisAssembly()
.BasedOn(typeof(IHandleMessages<>))
.WithServiceAllInterfaces()
.LifestyleTransient());
我用
配置了 Rebus
var bus = Configure.With(new CastleWindsorContainerAdapter(container))
.Logging(x => x.Trace())
.Transport(
t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
inputQueueAddress: ProfileInputQueueName, mode: AzureServiceBusMode.Standard))
.Options(o => o.SimpleRetryStrategy(ProfileErrorQueueName))
.Start();
并订阅了这样的消息类型
bus.Subscribe(typeof(Nabufit.Messages.Identity.Events.IdentityCreated)).Wait()
我预计我的处理程序会被自动调用。然而它没有:(。
我尝试过不同的解决方案
- 更改了输入队列的名称
- 创建了一个 eventemitter 程序,该程序发布了 'IdentityCreated' 类型的事件。当查看输入队列时,它存在但没有被 rebus 拾取。
奖金信息:
- 使用 azure 服务总线
- 在 Service fabric 应用程序中托管 Rebus
- 我的输入队列被命名为'profile-westeu-input'
在调查该应用程序后,我们发现我们在 OwinCommunicationListener 中的 Webapi 之间共享了 Windsor 容器,它具有一些自定义的依赖生命周期配置。这导致了两个不同的错误。
- Rebus 不接收事件,因为容器配置
- 从架构上看,与消费进程共享同一个容器并不明智
我们最终使用 Rebus 提供的 BuiltinHandlerActivation class 构建了一个特定于总线使用过程的自定义 ICommunicationListener。看起来像这样。
public class ServiceBusCommunicationListener : ICommunicationListener
{
private BuiltinHandlerActivator activator;
public async Task<string> OpenAsync(CancellationToken cancellationToken)
{
activator = new BuiltinHandlerActivator();
RegisterHandlers(activator);
var connectionString = "...";
var bus = Configure.With(activator)
.Logging(x => x.Serilog(Log.Logger))
.Transport(
t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
inputQueueAddress: "input", mode: AzureServiceBusMode.Standard))
.Options(o => o.SimpleRetryStrategy("error"))
.Start();
return connectionString;
}
private void RegisterHandlers(BuiltinHandlerActivator builtinHandlerActivator)
{
(...)
}
public async Task CloseAsync(CancellationToken cancellationToken)
{
if (activator != null)
activator.Dispose();
}
public void Abort()
{
if (activator != null)
activator.Dispose();
}
}
并将 ServicebusCommunicationListner 注册为 ServiceInstanceListener。
internal sealed class ProfileService : StatelessService
{
public ProfileService(StatelessServiceContext context)
: base(context)
{ }
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
return new[]
{
new ServiceInstanceListener(context => new ServiceBusCommunicationListener()),
};
}
}
我 运行 遇到了一些关于 rebus 的问题。
这是我的场景。 我们有三项服务
身份)发表'IdentityCreated'留言
网关) 将 'UpdateProfileCommand' 直接发送到 'profile-westeu-input' 队列
Profile) 使用来自输入队列 'profile-westeu-input' 的消息并订阅 'IdentityCreated' 消息
在Profile Service
中看到的rebus配置因为我已经在温莎城堡注册了我的管理员。
container.Register(Classes.FromThisAssembly()
.BasedOn(typeof(IHandleMessages<>))
.WithServiceAllInterfaces()
.LifestyleTransient());
我用
配置了 Rebusvar bus = Configure.With(new CastleWindsorContainerAdapter(container))
.Logging(x => x.Trace())
.Transport(
t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
inputQueueAddress: ProfileInputQueueName, mode: AzureServiceBusMode.Standard))
.Options(o => o.SimpleRetryStrategy(ProfileErrorQueueName))
.Start();
并订阅了这样的消息类型
bus.Subscribe(typeof(Nabufit.Messages.Identity.Events.IdentityCreated)).Wait()
我预计我的处理程序会被自动调用。然而它没有:(。
我尝试过不同的解决方案
- 更改了输入队列的名称
- 创建了一个 eventemitter 程序,该程序发布了 'IdentityCreated' 类型的事件。当查看输入队列时,它存在但没有被 rebus 拾取。
奖金信息:
- 使用 azure 服务总线
- 在 Service fabric 应用程序中托管 Rebus
- 我的输入队列被命名为'profile-westeu-input'
在调查该应用程序后,我们发现我们在 OwinCommunicationListener 中的 Webapi 之间共享了 Windsor 容器,它具有一些自定义的依赖生命周期配置。这导致了两个不同的错误。
- Rebus 不接收事件,因为容器配置
- 从架构上看,与消费进程共享同一个容器并不明智
我们最终使用 Rebus 提供的 BuiltinHandlerActivation class 构建了一个特定于总线使用过程的自定义 ICommunicationListener。看起来像这样。
public class ServiceBusCommunicationListener : ICommunicationListener
{
private BuiltinHandlerActivator activator;
public async Task<string> OpenAsync(CancellationToken cancellationToken)
{
activator = new BuiltinHandlerActivator();
RegisterHandlers(activator);
var connectionString = "...";
var bus = Configure.With(activator)
.Logging(x => x.Serilog(Log.Logger))
.Transport(
t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
inputQueueAddress: "input", mode: AzureServiceBusMode.Standard))
.Options(o => o.SimpleRetryStrategy("error"))
.Start();
return connectionString;
}
private void RegisterHandlers(BuiltinHandlerActivator builtinHandlerActivator)
{
(...)
}
public async Task CloseAsync(CancellationToken cancellationToken)
{
if (activator != null)
activator.Dispose();
}
public void Abort()
{
if (activator != null)
activator.Dispose();
}
}
并将 ServicebusCommunicationListner 注册为 ServiceInstanceListener。
internal sealed class ProfileService : StatelessService
{
public ProfileService(StatelessServiceContext context)
: base(context)
{ }
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
return new[]
{
new ServiceInstanceListener(context => new ServiceBusCommunicationListener()),
};
}
}