反应式应用程序中的事务回滚

Transaction rollback in a reactive application

我正在使用 RxJava 1.1 从 Spring 应用程序内部组成一个可观察序列,如下所示:

@Transaction
public Observable<Event> create(Event event) {
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}

一个事件从某个控制器 class 到达我的应用程序的服务层,并通过使用 RxJava 的 flatMap() 函数构建的一系列操作传播。该事件首先存储在数据库中(Spring 数据),接下来的两个异步 HTTP 请求在幕后使用 Spring 的 AsyncRestTemplate 库一个接一个地执行。

万一在管道中的任何地方抛出 error/exception,我希望能够回滚数据库事务,以便事件不存储在数据库中。我发现这并不容易做到,因为在 Spring 中事务上下文与特定的执行线程相关联。因此,如果代码到达不同线程上的 doOnError 回调(AsyncRestTemplate 使用其自己的 AsyncTaskExecutor),则无法回滚最初创建的事务。

能否请您提供任何机制来实现由多个异步操作组成的多线程应用程序以这种方式编写的事务?

我还尝试以编程方式创建交易:

TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());

然后通过管道发送 transactionStatus 对象和事件,但是再次发生错误时我调用 "platformTransactionManager.rollback(status);",我得到 "transaction synchronization is not active" 因为这是 运行我猜是一个不同的线程。

p.s。 sendEventToServiceA / sendEventToServiceB 方法类似于:

public Observable<Event> sendEventToServiceA(event) {
    ..........
    ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
              "/serviceA/create?event_id=" + event.id,
              HttpMethod.POST, requestEntity, String.class);

    return ObservableUtil.toRxObservable(listenableFuture);
}

这样做的一种方法是确保在与数据库保存相同的线程上观察到错误:

@Transaction
public Observable<Event> create(Event event) {

     Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
     return Observable.just(event)
            .flatMap(event -> {
                //save event to db (blocking JPA operation)
                Event event = eventRepository.save(event); 
                return Observable.just(event);
            })
            .subscribeOn(scheduler)
            //async REST call to service A
            .flatMap(this::sendEventToServiceA) <---- may execute on different thread
            //async REST call to service B
            .flatMap(this::sendEventToServiceB) <---- may execute on different thread
            .observeOn(scheduler)
            .doOnError( throwable -> {
                // ? rollback initally created transaction?
            })
}