如何在另一个 Mono 终止后触发 Mono 执行
How to trigger Mono execution after another Mono terminates
当我尝试在 doFinally
子句中执行 Mono 时遇到问题。
这是我的代码。
public interface Locks {
Mono<ReactiveDistributedLock> doLock(LockParams params);
Mono<Boolean> doUnlock(ReactiveDistributedLock lock);
default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
return doLock(params)
.flatMap(lock -> stage.get().doFinally(ignored -> doUnlock(lock)));
}
问题是 doFinally()
returns 里面的 doUnlock(lock)
是一个没有人订阅的单声道,因为 doFinally
没有链接。所以 doUnlock
中的异步代码部分从未真正执行过。
有没有办法使用 Mono
或 Flux
助手来避免这种情况?
使用Mono#then.
不幸的是,您无法避免使用 Mono/Flux,一旦您的 API 建立在它之上,但是,您可以通过以下方式解决该问题。
要链接几个独立的执行,应该一个接一个地订阅,并且第一个的结果将在第一个完成后 returned,有一个 Mono#then
运算符允许编写以下(类似承诺的)代码:
public interface Locks {
Mono<ReactiveDistributedLock> doLock(LockParams params);
Mono<Boolean> doUnlock(ReactiveDistributedLock lock);
default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
return doLock(params)
.flatMap(lock ->
stage.get()
.flatMap(value ->
doUnlock(lock)
.then(Mono.just(value))
)
);
}
}
这里为了链式执行然后释放锁然后return暂存值,我们使用flatMap
映射值作为释放锁和then
return 再次使用分阶段值。 (承认,听起来很尴尬)
注意,在error终端信号的情况下,then
将被忽略。因此,要实现 try-finally 行为,可能需要提供额外的 orErrorResume
运算符,如下例所示:
public interface Locks {
Mono<ReactiveDistributedLock> doLock(LockParams params);
Mono<Boolean> doUnlock(ReactiveDistributedLock lock);
default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
return doLock(params)
.flatMap(lock ->
stage.get()
.flatMap(value ->
doUnlock(lock)
.then(Mono.just(value))
)
.onErrorResume(t ->
doUnlock(lock)
.then(Mono.error(t))
)
);
}
}
当我尝试在 doFinally
子句中执行 Mono 时遇到问题。
这是我的代码。
public interface Locks {
Mono<ReactiveDistributedLock> doLock(LockParams params);
Mono<Boolean> doUnlock(ReactiveDistributedLock lock);
default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
return doLock(params)
.flatMap(lock -> stage.get().doFinally(ignored -> doUnlock(lock)));
}
问题是 doFinally()
returns 里面的 doUnlock(lock)
是一个没有人订阅的单声道,因为 doFinally
没有链接。所以 doUnlock
中的异步代码部分从未真正执行过。
有没有办法使用 Mono
或 Flux
助手来避免这种情况?
使用Mono#then.
不幸的是,您无法避免使用 Mono/Flux,一旦您的 API 建立在它之上,但是,您可以通过以下方式解决该问题。
要链接几个独立的执行,应该一个接一个地订阅,并且第一个的结果将在第一个完成后 returned,有一个 Mono#then
运算符允许编写以下(类似承诺的)代码:
public interface Locks {
Mono<ReactiveDistributedLock> doLock(LockParams params);
Mono<Boolean> doUnlock(ReactiveDistributedLock lock);
default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
return doLock(params)
.flatMap(lock ->
stage.get()
.flatMap(value ->
doUnlock(lock)
.then(Mono.just(value))
)
);
}
}
这里为了链式执行然后释放锁然后return暂存值,我们使用flatMap
映射值作为释放锁和then
return 再次使用分阶段值。 (承认,听起来很尴尬)
注意,在error终端信号的情况下,then
将被忽略。因此,要实现 try-finally 行为,可能需要提供额外的 orErrorResume
运算符,如下例所示:
public interface Locks {
Mono<ReactiveDistributedLock> doLock(LockParams params);
Mono<Boolean> doUnlock(ReactiveDistributedLock lock);
default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
return doLock(params)
.flatMap(lock ->
stage.get()
.flatMap(value ->
doUnlock(lock)
.then(Mono.just(value))
)
.onErrorResume(t ->
doUnlock(lock)
.then(Mono.error(t))
)
);
}
}