Axon 4:从不同线程应用事件时未触发 EventSourcingHandler
Axon 4: EventSourcingHandler not triggered when applying event from a different thread
我在 Axon 4 中遇到了命令处理的小问题。
假设我有一个聚合需要在处理命令时调用外部服务。
外部服务使用异步客户端(vertx tcp 客户端+ rxjava),因此响应是在与创建聚合实例的线程不同的线程中给出的。
我想根据我的服务结果应用一个事件,但它不起作用,因为 AggregateLifecycle.apply()
调用在不同的线程上...
如何"transfert"聚合范围?
这是我想做的事情的一个小例子(使用 rxjava 2 和 lombok):
合计:
@Slf4j
@Aggregate
@NoArgsConstructor
public class MyAggregate {
@AggregateIdentifier
private String id;
@CommandHandler
public MyAggregate(CreationCommand creationCommand) {
Single.just("some data")
.observeOn(Schedulers.computation()) // <- comment this line and the test pass, uncomment and it fail because apply is on another thread ?
.subscribe((s, throwable) -> apply(new AggregateCreatedEvent(creationCommand.getId())));
}
@EventSourcingHandler
public void on(AggregateCreatedEvent event) {
this.id = event.getId();
}
}
@Value class CreationCommand { String id; }
@Value class AggregateCreatedEvent { String id;}
和测试:
public class MyAggregateTest {
AggregateTestFixture<MyAggregate> testFixture = new AggregateTestFixture<>(MyAggregate.class);
@Test
public void test() {
testFixture.givenNoPriorActivity()
.when(new CreationCommand("123"))
.expectEvents(new AggregateCreatedEvent("123"));
}
}
这是我遇到的错误:
java.lang.IllegalStateException: Cannot request current Scope if none is active
事件必须应用于管理该工作单元的线程,在本例中为 CommandHandler。 Axon 为异步操作提供了自己的机制。命令总线异步接受命令,事件处理器异步处理事件。异步实现您的 CommandHandler 也没有任何好处,无论如何,目前不支持它。
应用事件所需的所有数据通常应在命令或聚合状态下可用,而不是来自其他外部源。
这可能是您希望命令处理程序的样子:
@CommandHandler
public MyAggregate(CreationCommand creationCommand) {
apply(new AggregateCreatedEvent(creationCommand.getId());
}
我在 Axon 4 中遇到了命令处理的小问题。
假设我有一个聚合需要在处理命令时调用外部服务。
外部服务使用异步客户端(vertx tcp 客户端+ rxjava),因此响应是在与创建聚合实例的线程不同的线程中给出的。
我想根据我的服务结果应用一个事件,但它不起作用,因为 AggregateLifecycle.apply()
调用在不同的线程上...
如何"transfert"聚合范围?
这是我想做的事情的一个小例子(使用 rxjava 2 和 lombok):
合计:
@Slf4j
@Aggregate
@NoArgsConstructor
public class MyAggregate {
@AggregateIdentifier
private String id;
@CommandHandler
public MyAggregate(CreationCommand creationCommand) {
Single.just("some data")
.observeOn(Schedulers.computation()) // <- comment this line and the test pass, uncomment and it fail because apply is on another thread ?
.subscribe((s, throwable) -> apply(new AggregateCreatedEvent(creationCommand.getId())));
}
@EventSourcingHandler
public void on(AggregateCreatedEvent event) {
this.id = event.getId();
}
}
@Value class CreationCommand { String id; }
@Value class AggregateCreatedEvent { String id;}
和测试:
public class MyAggregateTest {
AggregateTestFixture<MyAggregate> testFixture = new AggregateTestFixture<>(MyAggregate.class);
@Test
public void test() {
testFixture.givenNoPriorActivity()
.when(new CreationCommand("123"))
.expectEvents(new AggregateCreatedEvent("123"));
}
}
这是我遇到的错误:
java.lang.IllegalStateException: Cannot request current Scope if none is active
事件必须应用于管理该工作单元的线程,在本例中为 CommandHandler。 Axon 为异步操作提供了自己的机制。命令总线异步接受命令,事件处理器异步处理事件。异步实现您的 CommandHandler 也没有任何好处,无论如何,目前不支持它。
应用事件所需的所有数据通常应在命令或聚合状态下可用,而不是来自其他外部源。
这可能是您希望命令处理程序的样子:
@CommandHandler
public MyAggregate(CreationCommand creationCommand) {
apply(new AggregateCreatedEvent(creationCommand.getId());
}