对 Axon 事件溯源的理解
Understanding of Axon Event Sourcing
我一直在学习轴突和事件溯源,我想我终于理解了其中的一部分并且它在我脑海中是有意义的,但我想确保我对它的理解是正确的并且我不是犯任何错误。代码有效,我也可以在 DOMAIN_EVENT_ENTRY table 中看到事件。
我将在下面 post 我的代码(来自文档的简单 GiftCard 示例)并解释我的思考过程。如果我没有理解正确,请你帮助我以正确的方式理解那部分。
我没有包含 commands/events,因为它们非常简单,包含 id、金额字段
首先是我的代码:
TestRunner.java
package com.example.demoaxon;
import java.util.UUID;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class TestRunner implements CommandLineRunner {
private final CommandGateway commandGateway;
@Autowired
public TestRunner(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
@Override
public void run(String... args) throws Exception {
log.info("send command");
String id = UUID.randomUUID().toString();
commandGateway.sendAndWait(new IssueCardCommand(id,100));
commandGateway.sendAndWait(new RedeemCardCommand(id,90));
}
}
GiftCard.java
package com.example.demoaxon;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
import org.axonframework.spring.stereotype.Aggregate;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@NoArgsConstructor
@Aggregate
@Slf4j
public class GiftCard {
@AggregateIdentifier
private String giftCardId;
private Integer amount;
@CommandHandler
public GiftCard(IssueCardCommand cmd) {
log.info("handling {}",cmd);
apply(new CardIssuedEvent(cmd.getCardId(),cmd.getAmount()));
}
@EventSourcingHandler
public void onCardIssuedEvent(CardIssuedEvent evt) {
log.info("applying {}",evt);
this.giftCardId = evt.getCardId();
this.amount = evt.getAmount();
}
@CommandHandler
public void redeemCardCommandHandler(RedeemCardCommand cmd) {
log.info("handling {}",cmd);
this.amount -= cmd.getAmount();
apply(new CardRedeemedEvent(cmd.getCardId(),cmd.getTransactionId(),this.amount));
}
@EventSourcingHandler
public void onCardRedeemedEvent(CardRedeemedEvent evt) {
log.info("applying {}",evt);
this.amount = evt.getAmount();
}
}
据我了解:
在我的 TestRunner
class 中,命令网关使用命令总线将 issueCardCommand
分派给它的 CommandHandler
,然后创建一个GiftCard
聚合的新实例。在这个CommandHandler
中我们可以执行任何逻辑,然后我们使用这个apply
方法。
apply(event)
方法用于将 CardIssuedEvent
作为 GiftCard
聚合范围内的 EventMessage
发布,它还调用EventSourcingHandler
对于该特定事件,因此在这种情况下 onCardIssuedEvent
。它将 EventMessage 发布到 EventBus 并发送到 EventHandlers。
在 @EventSourcingHandler onCardIssuedEvent
中,我们可以对 GiftCard
聚合进行任何状态更改,我们还将事件持久化到 DOMAIN_EVENT_ENTRY
table 使用 spring Jpa.
此CommandHandler
执行完毕后,聚合对象将不再存在。
现在再次在我的 TestRunner
class 中,命令网关将 RedeemCardCommand
分派到它的 CommandHandler
并且因为第一个命令不再存在,空的无参数构造函数用于创建对象。 axon 框架从 DOMAIN_EVENT_ENTRY
table 中检索所有事件,并重播 GiftCard
聚合实例的所有事件 (EventSourcingHandlers) 以获得它的当前状态(这就是为什么 @AggregateIdentifier
很重要)。
然后执行 RedeemCardCommandHandler
方法,它执行任何逻辑并应用事件,该事件在聚合中发布并调用它的 EventSourcingHandler
。 EventSourcingHandler
然后更新 GiftCard
聚合的状态/持续到 DOMAIN_EVENT_ENTRY
table.
我对事件溯源工作原理的理解是否正确?
是的,看起来您正在正确使用命令网关,并且聚合具有使用正确内容注释的正确方法!您正在为分布式事务使用 Saga 设计模式,这比 2PC(两阶段提交)要好得多,因此每一步调用下一步都是正确的做法。
只要开始的saga方法用@StartSaga
注解,最后的方法用@EndSaga
注解,那么这些步骤就会按顺序进行,回滚也应该可以正常进行
saveAndWait
函数实际上 returns 一个 CompletableFuture
,所以如果你愿意,你可以在调试模式下单步执行线程并暂停线程直到整个 saga 完成如果那是你想做的。
我唯一关心的是 - 您是使用 Axon 服务器作为事件源,还是 Axon 支持的其他事件总线或源? Standard Axon Server 的唯一问题是它是一个 Free Tier
产品,他们提供更好、更受支持的版本,如 Axon Server Enterprise
(它提供更多功能 - 更适合基础设施) - 所以如果你是为一家公司做这件事,那么他们可能愿意为更高级别的额外支持付费...
Axon Framework 实际上支持 Spring AMQP,这很有用,因为这样您就可以更好地控制自己的基础设施,而不是被束缚在付费的 Axon Server
上——我不其实是好还是不好。
如果你能让它与自定义事件一起工作 Bus/Sourcer,那么你的系统将非常好 - 结合在 Axon 上开发的便利性。
只要聚合上有正确的注释,并且您的命令网关已正确自动连接,那么一切都应该可以正常工作。我唯一担心的是 Axon Framework
还没有足够的支持...
让我尝试引导您完成这段旅程!
您的理解几乎完全正确。我只想将其拆分为事件溯源和 Axon 以便更好地理解。
- 事件溯源
简而言之,事件溯源是一种通过过去发生的事件的历史来存储应用程序状态的方法。请记住,您还有其他模式,例如状态存储聚合。
在您的示例中,您使用的是事件溯源,因此 @EventSourcingHandler
就位。现在进入下一个话题。
- 轴突
我建议您阅读我们一位同事的这篇文章 awesome blog,特别是其中包含的幻灯片,您将在 Axon Framework 中看到一条消息的旅程!
关于你的观点,我想先澄清一下:
- 正确,您分派了一个
Command
并且由于注释的方法是一个构造函数,它将为您创建一个聚合。 CommandHanlder
s 是业务逻辑和验证的正确位置。
- 此处
apply
将在内部发布消息(向此 Aggregate
以及他们的 Entities
/AggregateMember
发布),然后再向 EventBus
.来自 javadoc:
The event should be applied to the aggregate immediately and scheduled for publication to other event handlers.
因为我们讨论的是事件溯源,所以所有 EventSourcingHandler
都会被调用,Aggregate
modified/updated 的状态也会被调用。这很重要,因为如前所述,这是您在需要时重建聚合状态的方式。但是事件的持久化不会在这里发生,它已经被安排在这个过程完成时发生。
正确。
也正确。这与一般的事件溯源以及它如何重建您的 Aggregate
.
有关
关于事件发生时间 published/saved/persisted.
的第 3 点观察结果也得到纠正
关于您的代码的另一个小注释:您正在 @CommandHandler
上执行此操作
@CommandHandler
public void redeemCardCommandHandler(RedeemCardCommand cmd) {
log.info("handling {}", cmd);
this.amount -= cmd.getAmount(); // you should not do this here
apply(new CardRedeemedEvent(cmd.getCardId(), cmd.getTransactionId(), this.amount));
}
此状态更改应在 @EventSourcingHandler
中进行。在 @CommandHandler
上,只有当此聚合有足够的 'money' 可以兑换时,您才应该进行验证:)
我一直在学习轴突和事件溯源,我想我终于理解了其中的一部分并且它在我脑海中是有意义的,但我想确保我对它的理解是正确的并且我不是犯任何错误。代码有效,我也可以在 DOMAIN_EVENT_ENTRY table 中看到事件。
我将在下面 post 我的代码(来自文档的简单 GiftCard 示例)并解释我的思考过程。如果我没有理解正确,请你帮助我以正确的方式理解那部分。
我没有包含 commands/events,因为它们非常简单,包含 id、金额字段
首先是我的代码:
TestRunner.java
package com.example.demoaxon;
import java.util.UUID;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class TestRunner implements CommandLineRunner {
private final CommandGateway commandGateway;
@Autowired
public TestRunner(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
@Override
public void run(String... args) throws Exception {
log.info("send command");
String id = UUID.randomUUID().toString();
commandGateway.sendAndWait(new IssueCardCommand(id,100));
commandGateway.sendAndWait(new RedeemCardCommand(id,90));
}
}
GiftCard.java
package com.example.demoaxon;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
import org.axonframework.spring.stereotype.Aggregate;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@NoArgsConstructor
@Aggregate
@Slf4j
public class GiftCard {
@AggregateIdentifier
private String giftCardId;
private Integer amount;
@CommandHandler
public GiftCard(IssueCardCommand cmd) {
log.info("handling {}",cmd);
apply(new CardIssuedEvent(cmd.getCardId(),cmd.getAmount()));
}
@EventSourcingHandler
public void onCardIssuedEvent(CardIssuedEvent evt) {
log.info("applying {}",evt);
this.giftCardId = evt.getCardId();
this.amount = evt.getAmount();
}
@CommandHandler
public void redeemCardCommandHandler(RedeemCardCommand cmd) {
log.info("handling {}",cmd);
this.amount -= cmd.getAmount();
apply(new CardRedeemedEvent(cmd.getCardId(),cmd.getTransactionId(),this.amount));
}
@EventSourcingHandler
public void onCardRedeemedEvent(CardRedeemedEvent evt) {
log.info("applying {}",evt);
this.amount = evt.getAmount();
}
}
据我了解:
在我的
TestRunner
class 中,命令网关使用命令总线将issueCardCommand
分派给它的CommandHandler
,然后创建一个GiftCard
聚合的新实例。在这个CommandHandler
中我们可以执行任何逻辑,然后我们使用这个apply
方法。apply(event)
方法用于将CardIssuedEvent
作为GiftCard
聚合范围内的EventMessage
发布,它还调用EventSourcingHandler
对于该特定事件,因此在这种情况下onCardIssuedEvent
。它将 EventMessage 发布到 EventBus 并发送到 EventHandlers。在
@EventSourcingHandler onCardIssuedEvent
中,我们可以对GiftCard
聚合进行任何状态更改,我们还将事件持久化到DOMAIN_EVENT_ENTRY
table 使用 spring Jpa.此
CommandHandler
执行完毕后,聚合对象将不再存在。现在再次在我的
TestRunner
class 中,命令网关将RedeemCardCommand
分派到它的CommandHandler
并且因为第一个命令不再存在,空的无参数构造函数用于创建对象。 axon 框架从DOMAIN_EVENT_ENTRY
table 中检索所有事件,并重播GiftCard
聚合实例的所有事件 (EventSourcingHandlers) 以获得它的当前状态(这就是为什么@AggregateIdentifier
很重要)。然后执行
RedeemCardCommandHandler
方法,它执行任何逻辑并应用事件,该事件在聚合中发布并调用它的EventSourcingHandler
。EventSourcingHandler
然后更新GiftCard
聚合的状态/持续到DOMAIN_EVENT_ENTRY
table.
我对事件溯源工作原理的理解是否正确?
是的,看起来您正在正确使用命令网关,并且聚合具有使用正确内容注释的正确方法!您正在为分布式事务使用 Saga 设计模式,这比 2PC(两阶段提交)要好得多,因此每一步调用下一步都是正确的做法。
只要开始的saga方法用@StartSaga
注解,最后的方法用@EndSaga
注解,那么这些步骤就会按顺序进行,回滚也应该可以正常进行
saveAndWait
函数实际上 returns 一个 CompletableFuture
,所以如果你愿意,你可以在调试模式下单步执行线程并暂停线程直到整个 saga 完成如果那是你想做的。
我唯一关心的是 - 您是使用 Axon 服务器作为事件源,还是 Axon 支持的其他事件总线或源? Standard Axon Server 的唯一问题是它是一个 Free Tier
产品,他们提供更好、更受支持的版本,如 Axon Server Enterprise
(它提供更多功能 - 更适合基础设施) - 所以如果你是为一家公司做这件事,那么他们可能愿意为更高级别的额外支持付费...
Axon Framework 实际上支持 Spring AMQP,这很有用,因为这样您就可以更好地控制自己的基础设施,而不是被束缚在付费的 Axon Server
上——我不其实是好还是不好。
如果你能让它与自定义事件一起工作 Bus/Sourcer,那么你的系统将非常好 - 结合在 Axon 上开发的便利性。
只要聚合上有正确的注释,并且您的命令网关已正确自动连接,那么一切都应该可以正常工作。我唯一担心的是 Axon Framework
还没有足够的支持...
让我尝试引导您完成这段旅程!
您的理解几乎完全正确。我只想将其拆分为事件溯源和 Axon 以便更好地理解。
- 事件溯源
简而言之,事件溯源是一种通过过去发生的事件的历史来存储应用程序状态的方法。请记住,您还有其他模式,例如状态存储聚合。
在您的示例中,您使用的是事件溯源,因此 @EventSourcingHandler
就位。现在进入下一个话题。
- 轴突
我建议您阅读我们一位同事的这篇文章 awesome blog,特别是其中包含的幻灯片,您将在 Axon Framework 中看到一条消息的旅程!
关于你的观点,我想先澄清一下:
- 正确,您分派了一个
Command
并且由于注释的方法是一个构造函数,它将为您创建一个聚合。CommandHanlder
s 是业务逻辑和验证的正确位置。 - 此处
apply
将在内部发布消息(向此Aggregate
以及他们的Entities
/AggregateMember
发布),然后再向EventBus
.来自 javadoc:
The event should be applied to the aggregate immediately and scheduled for publication to other event handlers.
因为我们讨论的是事件溯源,所以所有
EventSourcingHandler
都会被调用,Aggregate
modified/updated 的状态也会被调用。这很重要,因为如前所述,这是您在需要时重建聚合状态的方式。但是事件的持久化不会在这里发生,它已经被安排在这个过程完成时发生。正确。
也正确。这与一般的事件溯源以及它如何重建您的
有关Aggregate
.关于事件发生时间 published/saved/persisted.
的第 3 点观察结果也得到纠正
关于您的代码的另一个小注释:您正在 @CommandHandler
@CommandHandler
public void redeemCardCommandHandler(RedeemCardCommand cmd) {
log.info("handling {}", cmd);
this.amount -= cmd.getAmount(); // you should not do this here
apply(new CardRedeemedEvent(cmd.getCardId(), cmd.getTransactionId(), this.amount));
}
此状态更改应在 @EventSourcingHandler
中进行。在 @CommandHandler
上,只有当此聚合有足够的 'money' 可以兑换时,您才应该进行验证:)