使用 EventScheduler 不会触发 Aggregate 中的事件处理程序

Using EventScheduler does not trigger event handlers in Aggregate

当我通过命令触发预定事件时,我没有看到预期的事件处理程序触发器。我正在尝试在 Saga 中隔离一次性业务事务,同时仍然允许聚合以事件为源,以便能够重播状态更改。


我配置了以下SimpleEventScheduler.

@Bean
public SimpleEventScheduler simpleEventScheduler(EventBus eventBus) {
    return SimpleEventScheduler.builder()
        .eventBus(eventBus)
        .scheduledExecutorService(scheduledExecutorService())
        .build();
    }

private ScheduledExecutorService scheduledExecutorService() {
    return Executors.unconfigurableScheduledExecutorService(Executors.newSingleThreadScheduledExecutor());
}

我有一个具有 @CommandHandler

的聚合模型
@CommandHandler
public Letter(ScheduleLetterCommand cmd, EventScheduler scheduler) {
    String id = cmd.getLetterId();
    log.info("Received schedule command for letter id {}", id);
    ScheduleToken scheduleToken = scheduler.schedule(Duration.ofSeconds(5), new BeginSendLetterEvent(id, LetterEventType.BEGIN_SEND));
    AggregateLifecycle.apply(new LetterScheduledEvent(id, LetterEventType.SCHEDULED, scheduleToken));
}

和两个@EventSourcingHandler

@EventSourcingHandler
public void on(BeginSendLetterEvent event) {
    log.info("Letter sending process started {} {}", event.getLetterId(), event.getEventType());
    scheduleToken = null;
}

@EventSourcingHandler
public void on(LetterSentEvent event) {
    log.info("Letter sent {} {}", event.getLetterId(), event.getEventType());
    this.sent = true;
}

我有一个 saga,当 BeginSendLetterEvent 被触发并发布 LetterSentEvent.

时,它会做一些 'business logic'

@Saga
@Slf4j
public class LetterSchedulingSaga {

    private EventGateway eventGateway;

    public LetterSchedulingSaga() {
        //Axon requires empty constructor
    }

    @StartSaga
    @EndSaga
    @SagaEventHandler(associationProperty = "letterId")
    public void handle(BeginSendLetterEvent event) {
        log.info("Sending letter {}...", event.getLetterId());
        eventGateway.publish(new LetterSentEvent(event.getLetterId(), LetterEventType.SENT));
    }

    @Autowired
    public void setEventGateway(EventGateway eventGateway) {
        this.eventGateway = eventGateway;
    }
}

这是我的输出:

com.flsh.web.LetterScheduler            : Received request to schedule letter
com.flsh.web.LetterScheduler            : Finished request to schedule letter
com.flsh.axon.Letter                    : Received schedule command for letter id b7338082-e0e1-4ba0-b137-c7ff92afe3a1
com.flsh.axon.Letter                    : LetterScheduledEvent b7338082-e0e1-4ba0-b137-c7ff92afe3a1 SCHEDULED
com.flsh.axon.LetterSchedulingSaga      : Sending letter b7338082-e0e1-4ba0-b137-c7ff92afe3a1...

问题是我根本没有看到上述两个事件处理程序被触发。有人可以看到我在这里做错了什么吗? :) 任何帮助将不胜感激...

如果这是使用 Sagas 和事件处理程序的错误方式,请告诉我。我意识到我的基本示例并不能促进良好的域模型。

对@GoldFish 问题的简短回答是,您希望在命令模型中处理事件。

Axon 术语中的聚合是一个命令处理组件,因此在考虑 CQRS 时它是命令模型的一部分。 因此,它处理命令消息并验证是否可以执行给定操作(读取:命令)。如果验证的结果是 "yes",那就是您最终将在给定聚合实例的生命周期中发布一个事件。

可以引入聚合的 @EventSourcingHandler 注释方法是 "source the aggregate instance based on its own events"。

话虽如此,您可以预期聚合将永远不会 直接处理来自任何其他来源而不是它自己的事件。

EventScheduler 是事件的外部来源,就像采购时另一个聚合的事件一样。因此,它们将被忽略。

EventScheduler 会在后期发布一个事件,以便它可以被事件处理组件处理,例如 Saga 实例。 如果您想为 特定的 聚合或传奇实例安排某些事情发生,您应该查看 DeadlineManager

无论如何,对于您要实现的目标(我相信)是从传奇中触发聚合中的操作,您应该使用命令消息,因为聚合只能处理命令消息。