轴突和 CompletableFuture

Axon & CompletableFuture

我在尝试将 CompletableFuture 与 Axon 结合使用时遇到了问题。 例如:

CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            log.info("Start processing target: {}", target.toString());
            return new Event();

        }, threadPool);

future.thenAcceptAsync(event -> {
            log.info("Send Event");
            AggregateLifecycle.apply(event);
}, currentExecutor);

在 thenAcceptAsync - AggregateLifecycle.apply(event) 中有意外行为。我的一些 @EventSourcingHandler 处理程序开始处理事件两次。有人知道怎么解决吗?

我一直在阅读文档,我得到的一切都是:

In most cases, the DefaultUnitOfWork will provide you with the functionality you need. It expects processing to happen within a single thread.

所以,看来我应该以某种方式使用 CurrentUnitOfWork.get/set 方法,但仍然无法理解 Axon API.

您不应该 apply() 异步事件。 apply() 方法将调用聚合的内部 @EventSourcingHandler 方法并安排事件在工作单元完成(成功)时发布。 Axon 与工作单元(协调单个消息处理程序调用的 activity)一起工作的方式,必须在管理该工作单元的线程中调用 apply() 方法。

如果您想要异步发布事件,请使用使用异步传输的事件总线,并使用跟踪处理器。