在 nservicebus 处理程序期间锁定

locking during nservicebus handler

我有一个场景,其中我需要一个 nservicebus 消息处理程序来防止同时执行同一 saga 的多个消息。

为了参数的缘故,处理程序做了这样的事情(在这个例子中过于简单化了(

留言:

public class MyMessage : IMessage {
  public int OrderId {get;set;}
  public int NewQuantityLevel {get;set;}
}

传奇:

public void Handle(MyMessage message)
{
    // call remote service to get current order quantity
    // do some logic and update remote service with difference between original and new quantity

    Bus.Send(new MyOtherMessage())
}

现在我是我的进程,我可以随时收到 2 条或更多这样的消息,我不想让它们检索可能已经在其他地方更新或修改的订单数量。

我考虑了几个解决方案:

  1. 获取订单的互斥锁(目前我们在一台机器上只有一个worker实例运行,但未来有可能有多个,在这种情况下我们可能会使用redis锁或类似的东西)
  2. 在服务中使用 sql 锁定在 rows/data 上进行序列化锁定(但不确定这是否有效)

这些似乎都不是最佳选择,而且我确实感觉自己在与框架作对

由于更新本身是一个漫长的 运行 过程,是否可以创建次要 saga?当更新 saga 完成其工作时,它可以向原始 saga 发出信号以继续。

至于NServiceBus如何处理并发sagas有两种情况:

  • 当多个saga开始消息进来时,只有一个会提交。其他消息将失败并通过重试获取。在第二次尝试时,saga 已经存在并且没有创建第二个实例。这将确保只创建一个传奇。

  • 当您并发访问 saga(例如更新状态)时,持久性存储并发设置生效。如果使用 RavenDB,NServiceBus 会打开 optimistic concurrency support.

NServiceBus 文档的 this page 中对此进行了更详细的记录。

如果您需要确保每个批次只存在一个 saga 实例(例如,如果您可以将 saga 与您想要锁定的 ProductId 相关联),您可以将其用作相关 ID,因此只有一个 saga每 批次 .

存在

如果您只需要一个 saga 实例(更像是一个单例 saga),您可以使用无操作关联逻辑以及自定义 saga 查找器。这样您仍然可以扩展端点并且其他处理程序/传奇不会受到影响。此技术已显示 here

传奇就是锁。

正如@Hadi 所提到的,NServiceBus 将使用开放式并发来确保一次只有一条消息可以更新 saga 实例。

不要直接在 saga 中执行更新,而是存储您正在执行更新的事实,并发送消息以执行远程服务调用到不同端点中的单独消息处理程序。将事实存储在 saga 上并发送消息以执行它,要么完成要么根本不完成。如果两条消息同时尝试执行此操作,则只有一条会成功完成。另一条消息将获得并发异常,回滚到队列并最终重试。

那时它会看到已经有一个数量更新操作正在发生。然后,您可以丢弃第二条消息或在 saga 上存储一些状态,以确保在第一个完成后发生第二个数量更新。

将远程服务调用与全双工 request/response 消息传递一起移动到 saga 之外,确保了作为流程管理器的 saga 和作为集成点的消息处理程序之间的良好分离。

伪代码

public class MySaga
{
    public void Handle(MyMessage message)
    {
        if(Data.CurrentlyUpdatingQuantity)
            return; //or schedule for later

        Data.CurrentlyUpdatingQuantity = true;

        Bus.Send(new PerformQuantityUpdateMessage(message.OrderId));    
    }

    public void Handle(QuantityUpdateResponse message)
    {
        Data.CurrentlyUpdatingQuantity = false;
        Bus.Send(new MyOtherMessage());
    }
}

单独的消息处理程序(不属于 SAGA)

public void Handle(PerformQuantityUpdateMessage message)
{
    // call remote service to get current order quantity
    // do some logic and update remote service with difference between original and new quantity

    Bus.Reply(new QuantityUpdateResponse(message.OrderId));    
}

你可能不应该在你的故事中这样做:

// call remote service to get current order quantity

相反,应该将其移出到一个单独的端点,作为流程管理器的 saga 以全双工 request/response 消息传递方式与之交互。

因此,当传奇获得启动它的第一条触发消息时,它会向 RemoteServiceInvocationEndpoint 发送请求消息,并更新其状态以指示它正在等待响应。

RemoteServiceInvocationEndpoint 在得到远程服务的响应后,会向 saga 返回一个响应消息。

当 saga 收到该响应消息时,它将知道该过程已完成,然后执行所需的任何最终操作 - 例如发送其他消息。

如果 saga 收到另一个触发消息,它可以检查它的状态并看到它已经发送了一个请求消息并且知道不要再发送一个。

正如@Hadi 在他的回复中所说,NServiceBus 中的并发控制机制将保证 saga 一次只能成功处理一条消息。