使用 Akka 进行事件溯源时如何处理 CQRS 中的 Q?
How to approach the Q in CQRS when doing Event Sourcing with Akka?
与事件溯源结合使用时,有什么好的 CQRS 方法吗?
我想到的一种方法是,一旦命令变成事件并持久化到事件日志(这些事件代表写入模型),就在命令处理程序(持久性参与者的)中执行此操作,我会使用事件总线将事件发送给感兴趣的订阅查询参与者,以便他们可以更新他们的查询模型。
我考虑的另一种方式(假设期刊支持)是使用持久性查询(通过 Akka Streams),如 allPersistenceIds
或 currentPersistenceIds
,查询端(可能的查询参与者)可以定期执行此操作。
我走的路对吗?有更好的方法吗?
我认为您首先提到的方法不会有问题。我不会说第二个。如果我对您的理解正确,您希望查询投影能够拉取更新而不是通过事件总线推送。我认为这里的问题是你必须区分你已经处理过的事件和新的事件(更新)。我不确定 Akka EventStore 日志是否可以处理这个问题,但我对此表示怀疑。
经过一些研究并承认在处理命令(上面的第一种方法)期间更新读取端可能会失败,这将涉及处理回滚和事务。更好的方法是使用持久性查询。这在很大程度上依赖于您的 Event Journal 支持持久性查询的不同功能(如 AllPersistenceIdsQuery
和 EventsByTag
),这些功能目前由 Cassandra、Event Store 等期刊支持。
这个想法是将命令端与查询端分离,其中命令端根本不需要知道有任何查询端。这种解耦是使用 Persistence Query 给我们的。这个想法是命令端应该只关心验证传入的命令并将事件持久化到事件日志中。就是这样,不知道查询端。
现在,对于查询端,您可以使用 EventsByTag
或 AllPersistenceIdsQuery
等持久性查询从日志中获取 Source[Event]
,这是 back -pressured 来自 Event Journal 的 Event
直播流,你使用这个抽象来喂养你的 Read Sides。你可以找到方法 here
让我们想想失败
如果命令端宕机怎么办?
不会将新的 Event
s 持久保存到事件日志中,读取端持久性查询将很高兴地在流中产生任何新的 Event
s。当命令端恢复时,一切都会恢复,现在读取端持久性查询将在流中看到新的 Event
s。
如果读取端之一出现故障怎么办?
这不是什么大问题,您可以擦除读取端数据库并从头开始重新启动持久性查询,然后我们就可以开始了。我们绝对可以通过使用 Resumable Projections 的概念来改进这一点。我们的想法是继续存储您阅读的每个 Event
的当前偏移量,这样如果您的阅读端出现故障,我们可以简单地从我们正在阅读的点恢复,而不必从头开始。一句忠告,如果您需要一次有效的交付,那么您可能需要考虑使用幂等过滤器来避免重复。如果你这样做,你可以做一些优化,你不需要坚持每个 ID,但只是在特定的时间间隔。
其他一些注意事项
如果您需要引入新的读取端并且它们从一开始就依赖于命令端事件怎么办?
这种解耦方法可以让您做到这一点。这个想法是,您不会从事件存储中删除事件,只需启动另一个持久性查询并让它提供给您新的读取端,并且如果您需要保持最新状态,请不要忘记使用可恢复投影。
这些方法隐含地暗示具有持久性查询和在同一系统上提供读取端和可恢复投影的逻辑。
另一种方法是将 Persistence Query 与 Akka HTTP 相结合,并公开一个流端点,该端点公开您的事件日志并允许您从开始或某个偏移量获取所有 Events/certain 事件。这种方法允许您做一些进一步的解耦,但如果您使用这种方法,您真的希望有可恢复的投影,因为现在由于引入 HTTP 而增加了失败。现在,您的读取端可以使用此流端点并在其端使用可恢复投影,并且可以以更解耦的方式引入新的读取端。
这些只是我开始使用 Akka 后收集到的一些知识。也许一些更有经验的人有更好的 CQRS 方法。
如果您正在寻找代码示例,我强烈建议您阅读 Christian Baxter 的 Mastering Akka,其中更详细地描述了该方法以及 post。
感谢阅读
与事件溯源结合使用时,有什么好的 CQRS 方法吗?
我想到的一种方法是,一旦命令变成事件并持久化到事件日志(这些事件代表写入模型),就在命令处理程序(持久性参与者的)中执行此操作,我会使用事件总线将事件发送给感兴趣的订阅查询参与者,以便他们可以更新他们的查询模型。
我考虑的另一种方式(假设期刊支持)是使用持久性查询(通过 Akka Streams),如 allPersistenceIds
或 currentPersistenceIds
,查询端(可能的查询参与者)可以定期执行此操作。
我走的路对吗?有更好的方法吗?
我认为您首先提到的方法不会有问题。我不会说第二个。如果我对您的理解正确,您希望查询投影能够拉取更新而不是通过事件总线推送。我认为这里的问题是你必须区分你已经处理过的事件和新的事件(更新)。我不确定 Akka EventStore 日志是否可以处理这个问题,但我对此表示怀疑。
经过一些研究并承认在处理命令(上面的第一种方法)期间更新读取端可能会失败,这将涉及处理回滚和事务。更好的方法是使用持久性查询。这在很大程度上依赖于您的 Event Journal 支持持久性查询的不同功能(如 AllPersistenceIdsQuery
和 EventsByTag
),这些功能目前由 Cassandra、Event Store 等期刊支持。
这个想法是将命令端与查询端分离,其中命令端根本不需要知道有任何查询端。这种解耦是使用 Persistence Query 给我们的。这个想法是命令端应该只关心验证传入的命令并将事件持久化到事件日志中。就是这样,不知道查询端。
现在,对于查询端,您可以使用 EventsByTag
或 AllPersistenceIdsQuery
等持久性查询从日志中获取 Source[Event]
,这是 back -pressured 来自 Event Journal 的 Event
直播流,你使用这个抽象来喂养你的 Read Sides。你可以找到方法 here
让我们想想失败
如果命令端宕机怎么办?
不会将新的 Event
s 持久保存到事件日志中,读取端持久性查询将很高兴地在流中产生任何新的 Event
s。当命令端恢复时,一切都会恢复,现在读取端持久性查询将在流中看到新的 Event
s。
如果读取端之一出现故障怎么办?
这不是什么大问题,您可以擦除读取端数据库并从头开始重新启动持久性查询,然后我们就可以开始了。我们绝对可以通过使用 Resumable Projections 的概念来改进这一点。我们的想法是继续存储您阅读的每个 Event
的当前偏移量,这样如果您的阅读端出现故障,我们可以简单地从我们正在阅读的点恢复,而不必从头开始。一句忠告,如果您需要一次有效的交付,那么您可能需要考虑使用幂等过滤器来避免重复。如果你这样做,你可以做一些优化,你不需要坚持每个 ID,但只是在特定的时间间隔。
其他一些注意事项
如果您需要引入新的读取端并且它们从一开始就依赖于命令端事件怎么办?
这种解耦方法可以让您做到这一点。这个想法是,您不会从事件存储中删除事件,只需启动另一个持久性查询并让它提供给您新的读取端,并且如果您需要保持最新状态,请不要忘记使用可恢复投影。
这些方法隐含地暗示具有持久性查询和在同一系统上提供读取端和可恢复投影的逻辑。
另一种方法是将 Persistence Query 与 Akka HTTP 相结合,并公开一个流端点,该端点公开您的事件日志并允许您从开始或某个偏移量获取所有 Events/certain 事件。这种方法允许您做一些进一步的解耦,但如果您使用这种方法,您真的希望有可恢复的投影,因为现在由于引入 HTTP 而增加了失败。现在,您的读取端可以使用此流端点并在其端使用可恢复投影,并且可以以更解耦的方式引入新的读取端。
这些只是我开始使用 Akka 后收集到的一些知识。也许一些更有经验的人有更好的 CQRS 方法。
如果您正在寻找代码示例,我强烈建议您阅读 Christian Baxter 的 Mastering Akka,其中更详细地描述了该方法以及 post。
感谢阅读