重新启动应用程序时事件处理程序重播?

Event handler replays when restarting app?

我正在本地玩 axon 服务器。我通过 运行 命令 docker run -d --name axonserver -p 8024:8024 -p 8124:8124 axoniq/axonserver.

在我的本地机器上安装了一个 docker 容器

当我启动我的 spring-boot 应用程序时,我的外部事件处理程序聚合重新运行s 所有以前的事件,所以我在启动时看到这个日志语句流。

同一事件处理程序还向聚合发布命令,然后聚合将新事件附加到处理命令的聚合。因此,每次我重新启动应用程序时,我的聚合最终都会在它的末尾添加一些事件,而我只想要或期望一个事件。

flsh.axon.LetterSchedulingHandler   : Sending letter 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f...
flsh.axon.LetterSchedulingHandler   : Sending letter 6b4f6966-85ea-46e0-9c49-21bcd501a1b5...
flsh.axon.LetterSchedulingHandler   : Sending letter fc36292f-c7bd-4575-b56f-130624a87466...

flsh.axon.Letter                    : LetterScheduledEvent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SCHEDULED
flsh.axon.Letter                    : LetterScheduledEvent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SCHEDULED
flsh.axon.Letter                    : LetterScheduledEvent fc36292f-c7bd-4575-b56f-130624a87466 SCHEDULED
flsh.axon.Letter                    : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter                    : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter                    : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT
flsh.axon.Letter                    : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter                    : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter                    : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter                    : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT

我的事件处理程序如下所示:

@Slf4j
@Component
public class LetterSchedulingHandler {

    private final CommandGateway commandGateway;

    public LetterSchedulingHandler(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    @DisallowReplay //this doesn't seem to work
    @EventHandler
    public void handle(BeginSendLetterEvent event) {
        log.info("Sending letter {}...", event.getLetterId());
        commandGateway.send(new LetterSentCommand(event.getLetterId()));
    }
}
@CommandHandler
public void handle(LetterSentCommand cmd) {
    AggregateLifecycle.apply(new LetterSentEvent(cmd.getLetterId()));
}

这些事件正在通过调度程序发布...即通过不同的 @CommandHandler 聚合 运行,否则会按预期成功 运行。

@CommandHandler
public Letter(ScheduleLetterCommand cmd, EventScheduler scheduler) {
    String id = cmd.getLetterId();
    log.info("Received schedule command for letter id {}", id);
    ScheduleToken scheduleToken = scheduler.schedule(Duration.ofSeconds(5), new BeginSendLetterEvent(id));
    AggregateLifecycle.apply(new LetterScheduledEvent(id, scheduleToken));
}

@DisallowReplay 注释似乎无法阻止这种情况。此外,我尝试按照 directions here 使处理程序成为 Subscribing 事件处理器,但要么我没有做对,要么也没有解决问题。

axon:
  axonserver:
    servers: localhost
  eventhandling:
    processors:
      LetterSchedulingHandler:
        mode: subscribing

你错过的@GoldFLsh,是这两个之一:

  1. 正确引用 LetterSchedulingHandler
  2. 后面的 EventProcessor
  3. 定义 LetterSchedulingHandler
  4. @ProcessingGroup

您可能已经在 Event Processors 的参考指南中阅读过,因为任何事件处理组件(例如阅读您的 LetterSchedulingHandler)都将与订阅中的其他事件处理组件组合在一起或跟踪事件处理器。

但是您在此阶段还没有配置任何处理组名称。这意味着 LetterSchedulingHandler 的处理组将默认为处理程序的包名称。因此,在您的属性文件中,您应该使用包名称而不是 LetterSchedulingHandler 来将其定义为正在订阅。

但更清楚的是,在 LetterSchedulingHandler 上添加 @ProcessingGroup 注释,给它一个清晰的名称,然后您可以在属性文件中使用它。

最后,您没有正确引用事件处理器这一事实是您会看到事件重播的原因。 Axon 默认为 TrackingEventProcessor,它跟踪处理流事件的程度。如果没有持久化单元来存储这些TrackingTokens,它总是会从头开始。