重新启动应用程序时事件处理程序重播?
Event handler replays when restarting app?
我正在本地玩 axon 服务器。我通过 运行 命令 docker run -d --name axonserver -p 8024:8024 -p 8124:8124 axoniq/axonserver
.
在我的本地机器上安装了一个 docker 容器
当我启动我的 spring-boot 应用程序时,我的外部事件处理程序聚合重新运行s 所有以前的事件,所以我在启动时看到这个日志语句流。
同一事件处理程序还向聚合发布命令,然后聚合将新事件附加到处理命令的聚合。因此,每次我重新启动应用程序时,我的聚合最终都会在它的末尾添加一些事件,而我只想要或期望一个事件。
flsh.axon.LetterSchedulingHandler : Sending letter 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f...
flsh.axon.LetterSchedulingHandler : Sending letter 6b4f6966-85ea-46e0-9c49-21bcd501a1b5...
flsh.axon.LetterSchedulingHandler : Sending letter fc36292f-c7bd-4575-b56f-130624a87466...
flsh.axon.Letter : LetterScheduledEvent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SCHEDULED
flsh.axon.Letter : LetterScheduledEvent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SCHEDULED
flsh.axon.Letter : LetterScheduledEvent fc36292f-c7bd-4575-b56f-130624a87466 SCHEDULED
flsh.axon.Letter : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT
flsh.axon.Letter : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT
我的事件处理程序如下所示:
@Slf4j
@Component
public class LetterSchedulingHandler {
private final CommandGateway commandGateway;
public LetterSchedulingHandler(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
@DisallowReplay //this doesn't seem to work
@EventHandler
public void handle(BeginSendLetterEvent event) {
log.info("Sending letter {}...", event.getLetterId());
commandGateway.send(new LetterSentCommand(event.getLetterId()));
}
}
@CommandHandler
public void handle(LetterSentCommand cmd) {
AggregateLifecycle.apply(new LetterSentEvent(cmd.getLetterId()));
}
这些事件正在通过调度程序发布...即通过不同的 @CommandHandler
聚合 运行,否则会按预期成功 运行。
@CommandHandler
public Letter(ScheduleLetterCommand cmd, EventScheduler scheduler) {
String id = cmd.getLetterId();
log.info("Received schedule command for letter id {}", id);
ScheduleToken scheduleToken = scheduler.schedule(Duration.ofSeconds(5), new BeginSendLetterEvent(id));
AggregateLifecycle.apply(new LetterScheduledEvent(id, scheduleToken));
}
@DisallowReplay
注释似乎无法阻止这种情况。此外,我尝试按照 directions here 使处理程序成为 Subscribing
事件处理器,但要么我没有做对,要么也没有解决问题。
axon:
axonserver:
servers: localhost
eventhandling:
processors:
LetterSchedulingHandler:
mode: subscribing
你错过的@GoldFLsh,是这两个之一:
- 正确引用
LetterSchedulingHandler
后面的 EventProcessor
- 定义
LetterSchedulingHandler
的 @ProcessingGroup
您可能已经在 Event Processors 的参考指南中阅读过,因为任何事件处理组件(例如阅读您的 LetterSchedulingHandler
)都将与订阅中的其他事件处理组件组合在一起或跟踪事件处理器。
但是您在此阶段还没有配置任何处理组名称。这意味着 LetterSchedulingHandler
的处理组将默认为处理程序的包名称。因此,在您的属性文件中,您应该使用包名称而不是 LetterSchedulingHandler
来将其定义为正在订阅。
但更清楚的是,在 LetterSchedulingHandler
上添加 @ProcessingGroup
注释,给它一个清晰的名称,然后您可以在属性文件中使用它。
最后,您没有正确引用事件处理器这一事实是您会看到事件重播的原因。 Axon 默认为 TrackingEventProcessor
,它跟踪处理流事件的程度。如果没有持久化单元来存储这些TrackingTokens
,它总是会从头开始。
我正在本地玩 axon 服务器。我通过 运行 命令 docker run -d --name axonserver -p 8024:8024 -p 8124:8124 axoniq/axonserver
.
当我启动我的 spring-boot 应用程序时,我的外部事件处理程序聚合重新运行s 所有以前的事件,所以我在启动时看到这个日志语句流。
同一事件处理程序还向聚合发布命令,然后聚合将新事件附加到处理命令的聚合。因此,每次我重新启动应用程序时,我的聚合最终都会在它的末尾添加一些事件,而我只想要或期望一个事件。
flsh.axon.LetterSchedulingHandler : Sending letter 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f...
flsh.axon.LetterSchedulingHandler : Sending letter 6b4f6966-85ea-46e0-9c49-21bcd501a1b5...
flsh.axon.LetterSchedulingHandler : Sending letter fc36292f-c7bd-4575-b56f-130624a87466...
flsh.axon.Letter : LetterScheduledEvent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SCHEDULED
flsh.axon.Letter : LetterScheduledEvent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SCHEDULED
flsh.axon.Letter : LetterScheduledEvent fc36292f-c7bd-4575-b56f-130624a87466 SCHEDULED
flsh.axon.Letter : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT
flsh.axon.Letter : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT
我的事件处理程序如下所示:
@Slf4j
@Component
public class LetterSchedulingHandler {
private final CommandGateway commandGateway;
public LetterSchedulingHandler(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
@DisallowReplay //this doesn't seem to work
@EventHandler
public void handle(BeginSendLetterEvent event) {
log.info("Sending letter {}...", event.getLetterId());
commandGateway.send(new LetterSentCommand(event.getLetterId()));
}
}
@CommandHandler
public void handle(LetterSentCommand cmd) {
AggregateLifecycle.apply(new LetterSentEvent(cmd.getLetterId()));
}
这些事件正在通过调度程序发布...即通过不同的 @CommandHandler
聚合 运行,否则会按预期成功 运行。
@CommandHandler
public Letter(ScheduleLetterCommand cmd, EventScheduler scheduler) {
String id = cmd.getLetterId();
log.info("Received schedule command for letter id {}", id);
ScheduleToken scheduleToken = scheduler.schedule(Duration.ofSeconds(5), new BeginSendLetterEvent(id));
AggregateLifecycle.apply(new LetterScheduledEvent(id, scheduleToken));
}
@DisallowReplay
注释似乎无法阻止这种情况。此外,我尝试按照 directions here 使处理程序成为 Subscribing
事件处理器,但要么我没有做对,要么也没有解决问题。
axon:
axonserver:
servers: localhost
eventhandling:
processors:
LetterSchedulingHandler:
mode: subscribing
你错过的@GoldFLsh,是这两个之一:
- 正确引用
LetterSchedulingHandler
后面的 - 定义
LetterSchedulingHandler
的
EventProcessor
@ProcessingGroup
您可能已经在 Event Processors 的参考指南中阅读过,因为任何事件处理组件(例如阅读您的 LetterSchedulingHandler
)都将与订阅中的其他事件处理组件组合在一起或跟踪事件处理器。
但是您在此阶段还没有配置任何处理组名称。这意味着 LetterSchedulingHandler
的处理组将默认为处理程序的包名称。因此,在您的属性文件中,您应该使用包名称而不是 LetterSchedulingHandler
来将其定义为正在订阅。
但更清楚的是,在 LetterSchedulingHandler
上添加 @ProcessingGroup
注释,给它一个清晰的名称,然后您可以在属性文件中使用它。
最后,您没有正确引用事件处理器这一事实是您会看到事件重播的原因。 Axon 默认为 TrackingEventProcessor
,它跟踪处理流事件的程度。如果没有持久化单元来存储这些TrackingTokens
,它总是会从头开始。