如何在另一个 Mono 终止后触发 Mono 执行

How to trigger Mono execution after another Mono terminates

当我尝试在 doFinally 子句中执行 Mono 时遇到问题。 这是我的代码。

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> stage.get().doFinally(ignored -> doUnlock(lock)));
}

问题是 doFinally() returns 里面的 doUnlock(lock) 是一个没有人订阅的单声道,因为 doFinally 没有链接。所以 doUnlock 中的异步代码部分从未真正执行过。

有没有办法使用 MonoFlux 助手来避免这种情况?

使用Mono#then.

不幸的是,您无法避免使用 Mono/Flux,一旦您的 API 建立在它之上,但是,您可以通过以下方式解决该问题。

要链接几个独立的执行,应该一个接一个地订阅,并且第一个的结果将在第一个完成后 returned,有一个 Mono#then 运算符允许编写以下(类似承诺的)代码:

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> 
                    stage.get()
                         .flatMap(value -> 
                            doUnlock(lock)
                            .then(Mono.just(value))
                         )
                );
    }
}

这里为了链式执行然后释放锁然后return暂存值,我们使用flatMap映射值作为释放锁和thenreturn 再次使用分阶段值。 (承认,听起来很尴尬)

注意,在error终端信号的情况下,then将被忽略。因此,要实现 try-finally 行为,可能需要提供额外的 orErrorResume 运算符,如下例所示:

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> 
                    stage.get()
                         .flatMap(value -> 
                            doUnlock(lock)
                            .then(Mono.just(value))
                         )
                         .onErrorResume(t -> 
                            doUnlock(lock)
                            .then(Mono.error(t))
                         )
                );
    }
}