不同线程上的 NServiceBus 和 NHibernate EventListeners 运行
NServiceBus and NHibernate EventListeners running on different threads
我想要实现的是我的网站发出一条消息并将其放在总线上,服务接收它并通过自动填充行的 AddedBy/UpdatedBy 字段的审计将其写入数据库。
我通过使用 NServiceBus IMessageMutator 组件来执行此操作,该组件将用户 ID 写入来自 Thread.CurrentPrincipal 的消息 headers,该消息来自我的 ASP.Net 应用程序中的登录用户。
在我的服务中,我使用 IMessageModule 提取此 header 并将其绑定到 Thread.CurrentPrincipal。这很好用,在我的消息处理程序中,我可以看到 Thread.CurrentPrincipal.Identity.Name 已正确绑定到在 Web 应用程序中引发消息的用户 ID。
利用 NHibernate 的 IPreUpdateEventListener/IPreInsertEventListener 我在每个实体写入数据库之前设置 AddedBy/UpdatedBy。这在网站上完美运行,但在我的 NServiceBus 服务中,侦听器运行的线程与处理程序 运行 所在的线程不同,这意味着线程的 CurrentPrincipal 不再是我的 IMessageModule 中绑定的 ID。
我可以看到 NHibernate 在调用堆栈中使用 DistributedT运行sactionFactory,我怀疑这是我的问题的原因。我不想失去 t运行sactionality 这样如果提交失败消息不是 re-tried 或放置错误 queue 并且如果从 queue 失败,更新不会回滚到数据库。
我环顾了网络,所有示例都利用线程的 CurrentPrincipal 来绑定修改行的用户的 ID。我正在寻找的是一种方法,可以将 NHibernate 侦听器与消息处理程序保持在同一线程上,或者将用户 ID 传递给侦听器,以便它可以在写入数据库之前绑定到实体上。
这是我的监听器,我省略了其中找到的 Set 方法
public class EntityPersistenceListener : IPreUpdateEventListener, IPreInsertEventListener
{
public bool OnPreUpdate(PreUpdateEvent @event)
{
var audit = @event.Entity as EntityBase;
if (audit == null)
return false;
var time = DateTimeFactory.GetDateTime();
var name = Thread.CurrentPrincipal.Identity.Name;
Set(@event.Persister, @event.State, "AddedDate", audit.AddedDate);
Set(@event.Persister, @event.State, "AddedBy", audit.AddedBy);
Set(@event.Persister, @event.State, "UpdatedDate", time);
Set(@event.Persister, @event.State, "UpdatedBy", name);
audit.AddedDate = audit.AddedDate;
audit.AddedBy = audit.AddedBy;
audit.UpdatedDate= time;
audit.UpdatedBy = name;
return false;
}
}
这里是 NServiceBus Message 模块,提取 id 并将其绑定到当前线程的标识
public class TenantAndInstanceInfoExtractor : IMessageModule
{
private readonly IBus _bus;
public TenantAndInstanceInfoExtractor(IBus bus)
{
_bus = bus;
}
public void HandleBeginMessage()
{
var headers = _bus.CurrentMessageContext.Headers;
if (headers.ContainsKey("TriggeredById"))
Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(headers["TriggeredById"]), null);
else
Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(string.Empty), null);
}
public void HandleEndMessage()
{
}
public void HandleError() { }
}
感谢 Simon 的帮助。在深入研究了我的问题并讨论了 NServiceBus 的内部工作方式之后,我采纳了您的见解并为 NServiceBus 实现了一个工作单元模块。
发生的事情是我们依赖每条消息创建的事务将我们的 NHibernate 会话提交给数据库。这发生在利用线程池的分布式事务控制器上(具体发生在此处NHibernate.Transaction.AdoNetWithDistributedTransactionFactory.DistributedTransactionContext)。
通过使用 NServiceBus 的 IManageUnitsOfWork 接口,我能够在与消息处理程序相同的线程上显式提交事务,如下面的代码示例所示。
作为对任何未来读者的旁注,这里最好的解决方案是不要使用 Thread.CurrentPrincipal,因为这个解决方案在多线程环境中失败了,就像我遇到的那样。
public class HiJumpNserviceBusUnitOfWork : IManageUnitsOfWork
{
private readonly IUnitOfWork _uow;
public HiJumpNserviceBusUnitOfWork(IUnitOfWork uow)
{
_uow = uow;
}
public void Begin()
{
_uow.ClearCache();
_uow.BeginTransaction();
}
public void End(Exception ex = null)
{
if (ex != null)
{
_uow.ClearCache();
}
else
{
_uow.CommitTransaction();
}
}
}
我想要实现的是我的网站发出一条消息并将其放在总线上,服务接收它并通过自动填充行的 AddedBy/UpdatedBy 字段的审计将其写入数据库。
我通过使用 NServiceBus IMessageMutator 组件来执行此操作,该组件将用户 ID 写入来自 Thread.CurrentPrincipal 的消息 headers,该消息来自我的 ASP.Net 应用程序中的登录用户。 在我的服务中,我使用 IMessageModule 提取此 header 并将其绑定到 Thread.CurrentPrincipal。这很好用,在我的消息处理程序中,我可以看到 Thread.CurrentPrincipal.Identity.Name 已正确绑定到在 Web 应用程序中引发消息的用户 ID。
利用 NHibernate 的 IPreUpdateEventListener/IPreInsertEventListener 我在每个实体写入数据库之前设置 AddedBy/UpdatedBy。这在网站上完美运行,但在我的 NServiceBus 服务中,侦听器运行的线程与处理程序 运行 所在的线程不同,这意味着线程的 CurrentPrincipal 不再是我的 IMessageModule 中绑定的 ID。
我可以看到 NHibernate 在调用堆栈中使用 DistributedT运行sactionFactory,我怀疑这是我的问题的原因。我不想失去 t运行sactionality 这样如果提交失败消息不是 re-tried 或放置错误 queue 并且如果从 queue 失败,更新不会回滚到数据库。
我环顾了网络,所有示例都利用线程的 CurrentPrincipal 来绑定修改行的用户的 ID。我正在寻找的是一种方法,可以将 NHibernate 侦听器与消息处理程序保持在同一线程上,或者将用户 ID 传递给侦听器,以便它可以在写入数据库之前绑定到实体上。
这是我的监听器,我省略了其中找到的 Set 方法
public class EntityPersistenceListener : IPreUpdateEventListener, IPreInsertEventListener
{
public bool OnPreUpdate(PreUpdateEvent @event)
{
var audit = @event.Entity as EntityBase;
if (audit == null)
return false;
var time = DateTimeFactory.GetDateTime();
var name = Thread.CurrentPrincipal.Identity.Name;
Set(@event.Persister, @event.State, "AddedDate", audit.AddedDate);
Set(@event.Persister, @event.State, "AddedBy", audit.AddedBy);
Set(@event.Persister, @event.State, "UpdatedDate", time);
Set(@event.Persister, @event.State, "UpdatedBy", name);
audit.AddedDate = audit.AddedDate;
audit.AddedBy = audit.AddedBy;
audit.UpdatedDate= time;
audit.UpdatedBy = name;
return false;
}
}
这里是 NServiceBus Message 模块,提取 id 并将其绑定到当前线程的标识
public class TenantAndInstanceInfoExtractor : IMessageModule
{
private readonly IBus _bus;
public TenantAndInstanceInfoExtractor(IBus bus)
{
_bus = bus;
}
public void HandleBeginMessage()
{
var headers = _bus.CurrentMessageContext.Headers;
if (headers.ContainsKey("TriggeredById"))
Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(headers["TriggeredById"]), null);
else
Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(string.Empty), null);
}
public void HandleEndMessage()
{
}
public void HandleError() { }
}
感谢 Simon 的帮助。在深入研究了我的问题并讨论了 NServiceBus 的内部工作方式之后,我采纳了您的见解并为 NServiceBus 实现了一个工作单元模块。
发生的事情是我们依赖每条消息创建的事务将我们的 NHibernate 会话提交给数据库。这发生在利用线程池的分布式事务控制器上(具体发生在此处NHibernate.Transaction.AdoNetWithDistributedTransactionFactory.DistributedTransactionContext)。
通过使用 NServiceBus 的 IManageUnitsOfWork 接口,我能够在与消息处理程序相同的线程上显式提交事务,如下面的代码示例所示。
作为对任何未来读者的旁注,这里最好的解决方案是不要使用 Thread.CurrentPrincipal,因为这个解决方案在多线程环境中失败了,就像我遇到的那样。
public class HiJumpNserviceBusUnitOfWork : IManageUnitsOfWork
{
private readonly IUnitOfWork _uow;
public HiJumpNserviceBusUnitOfWork(IUnitOfWork uow)
{
_uow = uow;
}
public void Begin()
{
_uow.ClearCache();
_uow.BeginTransaction();
}
public void End(Exception ex = null)
{
if (ex != null)
{
_uow.ClearCache();
}
else
{
_uow.CommitTransaction();
}
}
}