在尝试对 Commanded 中的传奇模式做出反应之前应用状态
Apply state before trying to react for a saga pattern in Commanded
在事件 sourcing/CQRS 框架命令中使用 Commanded.ProcessManagers.ProcessManager
模块实现传奇模式时,我 运行 遇到了问题。
在开具发票的情况下,我需要为发票实施批量创建机制。这种大规模创造既可以作为聚合体,也可以作为传奇来实现。聚合允许开始和完成批量创建。 saga 通过发出创建发票的命令并将其 ID 保持在 saga 状态来响应 "mass creation started" 事件。之后,saga 通过监听它命令存在的发票实例的成功或失败事件来跟踪发票创建的状态。一旦每个发票实例报告成功或失败,saga 应该发出停止批量创建的命令。
为此,跟踪每个发票实例及其当前状态会很有帮助:in progress
、created
或 failed
。我尝试在 apply
回调中实现它,原则上它工作得很好。
现在的问题是,apply
回调总是在handle
回调之后调用。因此,传奇状态在传奇应该做出反应后更新。这似乎违反直觉,因此,handle
回调中可用的状态不能用于正确反应。
在我看来,saga 模式在很多方面都是聚合模式的倒置。虽然首先将命令处理到域事件中然后将此域事件应用到聚合的状态中很有用,但我认为在 saga 的情况下,域事件是已经发生的事情的文档, 应在尝试对其做出反应之前应用于该状态。
现在我的问题是:有没有办法为 Commanded.ProcessManagers.ProcessManager
模块先配置 Commanded apply
然后 handle
?或者这实际上是一个错误,需要进行一般性修复?
在 handle/2
之后调用 apply/2
回调是设计使然,无法将 Commanded 配置为不同的行为。
我同意您的推理,即在尝试处理事件以生成任何命令之前将事件应用于流程管理器的状态更有意义。这似乎是对 Commanded 做出的一项值得更改的更改,可以通过您已经提出的问题进行跟踪 (#176)。
同时,您可以按如下方式实现您的流程管理器 (saga):
defmodule InvoicingProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: __MODULE__,
router: InvoicingRouter
defstruct [
:batch_uuid,
pending_invoice_ids: MapSet.new()
]
def interested?(%InvoiceBatchStarted{batch_uuid: batch_uuid}), do: {:start, batch_uuid}
def interested?(%InvoiceCreated{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
def interested?(%InvoiceFailed{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
def interested?(%InvoiceBatchStopped{batch_uuid: batch_uuid}), do: {:stop, batch_uuid}
def interested?(_event), do: false
# Event handlers
def handle(%InvoicingSaga{}, %InvoiceBatchStarted{} = started) do
%InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started
Enum.map(invoice_ids, fn invoice_id ->
%CreateInvoice{
invoice_id: invoice_id,
batch_uuid: batch_uuid
}
end)
end
def handle(%InvoicingSaga{}, %InvoiceCreated{invoice_id: invoice_id}),
do: attempt_stop_batch(pm, invoice_id)
def handle(%InvoicingSaga{}, %InvoiceFailed{invoice_id: invoice_id}),
do: attempt_stop_batch(pm, invoice_id)
## State mutators
def apply(%InvoicingSaga{} = pm, %InvoiceBatchStarted{} = started) do
%InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started
%InvoicingSaga{
transfer
| batch_uuid: batch_uuid,
pending_invoice_ids: MapSet.new(invoice_ids)
}
end
def apply(%InvoicingSaga{} = pm, %InvoiceCreated{invoice_id: invoice_id}) do
%InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
end
def apply(%InvoicingSaga{} = pm, %InvoiceFailed{invoice_id: invoice_id}) do
%InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
end
## Private helpers
def attempt_stop_batch(%InvoicingSaga{batch_uuid: batch_uuid} = pm, invoice_id) do
pending_invoices = invoice_completed(pm, invoice_id)
case empty?(pending_invoices) do
true -> %StopInvoiceBatch{batch_uuid: batch_uuid}
false -> []
end
end
defp invoice_completed(%InvoicingSaga{pending_invoice_ids: pending_invoice_ids}, invoice_id) do
MapSet.delete(pending_invoice_ids, invoice_id)
end
defp empty?(map_set, empty \ MapSet.new())
defp empty?(%MapSet{} = empty, %MapSet{} = empty), do: true
defp empty?(%MapSet{}, %MapSet{}), do: false
end
在事件 sourcing/CQRS 框架命令中使用 Commanded.ProcessManagers.ProcessManager
模块实现传奇模式时,我 运行 遇到了问题。
在开具发票的情况下,我需要为发票实施批量创建机制。这种大规模创造既可以作为聚合体,也可以作为传奇来实现。聚合允许开始和完成批量创建。 saga 通过发出创建发票的命令并将其 ID 保持在 saga 状态来响应 "mass creation started" 事件。之后,saga 通过监听它命令存在的发票实例的成功或失败事件来跟踪发票创建的状态。一旦每个发票实例报告成功或失败,saga 应该发出停止批量创建的命令。
为此,跟踪每个发票实例及其当前状态会很有帮助:in progress
、created
或 failed
。我尝试在 apply
回调中实现它,原则上它工作得很好。
现在的问题是,apply
回调总是在handle
回调之后调用。因此,传奇状态在传奇应该做出反应后更新。这似乎违反直觉,因此,handle
回调中可用的状态不能用于正确反应。
在我看来,saga 模式在很多方面都是聚合模式的倒置。虽然首先将命令处理到域事件中然后将此域事件应用到聚合的状态中很有用,但我认为在 saga 的情况下,域事件是已经发生的事情的文档, 应在尝试对其做出反应之前应用于该状态。
现在我的问题是:有没有办法为 Commanded.ProcessManagers.ProcessManager
模块先配置 Commanded apply
然后 handle
?或者这实际上是一个错误,需要进行一般性修复?
在 handle/2
之后调用 apply/2
回调是设计使然,无法将 Commanded 配置为不同的行为。
我同意您的推理,即在尝试处理事件以生成任何命令之前将事件应用于流程管理器的状态更有意义。这似乎是对 Commanded 做出的一项值得更改的更改,可以通过您已经提出的问题进行跟踪 (#176)。
同时,您可以按如下方式实现您的流程管理器 (saga):
defmodule InvoicingProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: __MODULE__,
router: InvoicingRouter
defstruct [
:batch_uuid,
pending_invoice_ids: MapSet.new()
]
def interested?(%InvoiceBatchStarted{batch_uuid: batch_uuid}), do: {:start, batch_uuid}
def interested?(%InvoiceCreated{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
def interested?(%InvoiceFailed{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
def interested?(%InvoiceBatchStopped{batch_uuid: batch_uuid}), do: {:stop, batch_uuid}
def interested?(_event), do: false
# Event handlers
def handle(%InvoicingSaga{}, %InvoiceBatchStarted{} = started) do
%InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started
Enum.map(invoice_ids, fn invoice_id ->
%CreateInvoice{
invoice_id: invoice_id,
batch_uuid: batch_uuid
}
end)
end
def handle(%InvoicingSaga{}, %InvoiceCreated{invoice_id: invoice_id}),
do: attempt_stop_batch(pm, invoice_id)
def handle(%InvoicingSaga{}, %InvoiceFailed{invoice_id: invoice_id}),
do: attempt_stop_batch(pm, invoice_id)
## State mutators
def apply(%InvoicingSaga{} = pm, %InvoiceBatchStarted{} = started) do
%InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started
%InvoicingSaga{
transfer
| batch_uuid: batch_uuid,
pending_invoice_ids: MapSet.new(invoice_ids)
}
end
def apply(%InvoicingSaga{} = pm, %InvoiceCreated{invoice_id: invoice_id}) do
%InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
end
def apply(%InvoicingSaga{} = pm, %InvoiceFailed{invoice_id: invoice_id}) do
%InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
end
## Private helpers
def attempt_stop_batch(%InvoicingSaga{batch_uuid: batch_uuid} = pm, invoice_id) do
pending_invoices = invoice_completed(pm, invoice_id)
case empty?(pending_invoices) do
true -> %StopInvoiceBatch{batch_uuid: batch_uuid}
false -> []
end
end
defp invoice_completed(%InvoicingSaga{pending_invoice_ids: pending_invoice_ids}, invoice_id) do
MapSet.delete(pending_invoice_ids, invoice_id)
end
defp empty?(map_set, empty \ MapSet.new())
defp empty?(%MapSet{} = empty, %MapSet{} = empty), do: true
defp empty?(%MapSet{}, %MapSet{}), do: false
end