轴突和 CompletableFuture
Axon & CompletableFuture
我在尝试将 CompletableFuture 与 Axon 结合使用时遇到了问题。
例如:
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
log.info("Start processing target: {}", target.toString());
return new Event();
}, threadPool);
future.thenAcceptAsync(event -> {
log.info("Send Event");
AggregateLifecycle.apply(event);
}, currentExecutor);
在 thenAcceptAsync - AggregateLifecycle.apply(event) 中有意外行为。我的一些 @EventSourcingHandler 处理程序开始处理事件两次。有人知道怎么解决吗?
我一直在阅读文档,我得到的一切都是:
In most cases, the DefaultUnitOfWork will provide you with the
functionality you need. It expects processing to happen within a
single thread.
所以,看来我应该以某种方式使用 CurrentUnitOfWork.get/set 方法,但仍然无法理解 Axon API.
您不应该 apply()
异步事件。 apply()
方法将调用聚合的内部 @EventSourcingHandler 方法并安排事件在工作单元完成(成功)时发布。
Axon 与工作单元(协调单个消息处理程序调用的 activity)一起工作的方式,必须在管理该工作单元的线程中调用 apply() 方法。
如果您想要异步发布事件,请使用使用异步传输的事件总线,并使用跟踪处理器。
我在尝试将 CompletableFuture 与 Axon 结合使用时遇到了问题。 例如:
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
log.info("Start processing target: {}", target.toString());
return new Event();
}, threadPool);
future.thenAcceptAsync(event -> {
log.info("Send Event");
AggregateLifecycle.apply(event);
}, currentExecutor);
在 thenAcceptAsync - AggregateLifecycle.apply(event) 中有意外行为。我的一些 @EventSourcingHandler 处理程序开始处理事件两次。有人知道怎么解决吗?
我一直在阅读文档,我得到的一切都是:
In most cases, the DefaultUnitOfWork will provide you with the functionality you need. It expects processing to happen within a single thread.
所以,看来我应该以某种方式使用 CurrentUnitOfWork.get/set 方法,但仍然无法理解 Axon API.
您不应该 apply()
异步事件。 apply()
方法将调用聚合的内部 @EventSourcingHandler 方法并安排事件在工作单元完成(成功)时发布。
Axon 与工作单元(协调单个消息处理程序调用的 activity)一起工作的方式,必须在管理该工作单元的线程中调用 apply() 方法。
如果您想要异步发布事件,请使用使用异步传输的事件总线,并使用跟踪处理器。