在尝试对 Commanded 中的传奇模式做出反应之前应用状态

Apply state before trying to react for a saga pattern in Commanded

在事件 sourcing/CQRS 框架命令中使用 Commanded.ProcessManagers.ProcessManager 模块实现传奇模式时,我 运行 遇到了问题。

在开具发票的情况下,我需要为发票实施批量创建机制。这种大规模创造既可以作为聚合体,也可以作为传奇来实现。聚合允许开始和完成批量创建。 saga 通过发出创建发票的命令并将其 ID 保持在 saga 状态来响应 "mass creation started" 事件。之后,saga 通过监听它命令存在的发票实例的成功或失败事件来跟踪发票创建的状态。一旦每个发票实例报告成功或失败,saga 应该发出停止批量创建的命令。

为此,跟踪每个发票实例及其当前状态会很有帮助:in progresscreatedfailed。我尝试在 apply 回调中实现它,原则上它工作得很好。

现在的问题是,apply回调总是在handle回调之后调用。因此,传奇状态在传奇应该做出反应后更新。这似乎违反直觉,因此,handle 回调中可用的状态不能用于正确反应。

在我看来,saga 模式在很多方面都是聚合模式的倒置。虽然首先将命令处理到域事件中然后将此域事件应用到聚合的状态中很有用,但我认为在 saga 的情况下,域事件是已经发生的事情的文档, 应在尝试对其做出反应之前应用于该状态。

现在我的问题是:有没有办法为 Commanded.ProcessManagers.ProcessManager 模块先配置 Commanded apply 然后 handle?或者这实际上是一个错误,需要进行一般性修复?

handle/2 之后调用 apply/2 回调是设计使然,无法将 Commanded 配置为不同的行为。

我同意您的推理,即在尝试处理事件以生成任何命令之前将事件应用于流程管理器的状态更有意义。这似乎是对 Commanded 做出的一项值得更改的更改,可以通过您已经提出的问题进行跟踪 (#176)。

同时,您可以按如下方式实现您的流程管理器 (saga):

defmodule InvoicingProcessManager do
  use Commanded.ProcessManagers.ProcessManager,
    name: __MODULE__,
    router: InvoicingRouter

  defstruct [
    :batch_uuid,
    pending_invoice_ids: MapSet.new()
  ]

  def interested?(%InvoiceBatchStarted{batch_uuid: batch_uuid}), do: {:start, batch_uuid}
  def interested?(%InvoiceCreated{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
  def interested?(%InvoiceFailed{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
  def interested?(%InvoiceBatchStopped{batch_uuid: batch_uuid}), do: {:stop, batch_uuid}
  def interested?(_event), do: false

  # Event handlers

  def handle(%InvoicingSaga{}, %InvoiceBatchStarted{} = started) do
    %InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started

    Enum.map(invoice_ids, fn invoice_id ->
      %CreateInvoice{
        invoice_id: invoice_id,
        batch_uuid: batch_uuid
      }
    end)
  end

  def handle(%InvoicingSaga{}, %InvoiceCreated{invoice_id: invoice_id}),
    do: attempt_stop_batch(pm, invoice_id)

  def handle(%InvoicingSaga{}, %InvoiceFailed{invoice_id: invoice_id}),
    do: attempt_stop_batch(pm, invoice_id)

  ## State mutators

  def apply(%InvoicingSaga{} = pm, %InvoiceBatchStarted{} = started) do
    %InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started

    %InvoicingSaga{
      transfer
      | batch_uuid: batch_uuid,
        pending_invoice_ids: MapSet.new(invoice_ids)
    }
  end

  def apply(%InvoicingSaga{} = pm, %InvoiceCreated{invoice_id: invoice_id}) do
    %InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
  end

  def apply(%InvoicingSaga{} = pm, %InvoiceFailed{invoice_id: invoice_id}) do
    %InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
  end

  ## Private helpers

  def attempt_stop_batch(%InvoicingSaga{batch_uuid: batch_uuid} = pm, invoice_id) do
    pending_invoices = invoice_completed(pm, invoice_id)

    case empty?(pending_invoices) do
      true -> %StopInvoiceBatch{batch_uuid: batch_uuid}
      false -> []
    end
  end

  defp invoice_completed(%InvoicingSaga{pending_invoice_ids: pending_invoice_ids}, invoice_id) do
    MapSet.delete(pending_invoice_ids, invoice_id)
  end

  defp empty?(map_set, empty \ MapSet.new())
  defp empty?(%MapSet{} = empty, %MapSet{} = empty), do: true
  defp empty?(%MapSet{}, %MapSet{}), do: false
end