不使用 EmitProcessor 的非阻塞锁
Non-blocking lock without using EmitProcessor
我有一个具有两个非阻塞操作的资源 (r)(这是来自我无法更改的外部库):
Mono<String> op1(String someInput);
Mono<String> op2(String otherInput);
op1() 和 op2() 可以在不同的线程上调用 但不能同时执行 。何时调用 op1 取决于外部因素,op2 也是如此。两个操作是不相关的(op1 可能被调用 100 次,op2 可能被调用 1 次)。如何保证op1和op2的处理互斥。如果 op1 /op2 阻塞,java 'synchronized' 会解决它。如果同时发生 op1 和 op2 处理,外部库方法将失败。
如何在不使用 EmitProcessor(已弃用)的情况下强制执行此类同步,以便可以从不同的调度程序线程调用 op1 和 op2?还是 WebFlux 中有内置的标准解决方案 api 来解决这种情况?
(有一个使用 EventProcessor 的解决方案,但希望避免使用它,因为 EventProcessor 已被弃用 Nonblocking ReentrantLock with Reactor)
目前在 Reactor 中没有为此内置任何内容。
其他答案中的解决方案可能会更新为Sinks.Many
新API,看来相关项目确实已更新:https://github.com/alex-pumpkin/reactor-lock
您可以也使用https://github.com/reactor/reactor-pool,但这有点矫枉过正。
以下解决方案确保 op1 和 op2 的互斥处理:
public class Locker {
private final AtomicBoolean locked = new AtomicBoolean(true);
private final Flux<Boolean> notifier;
private final Sinks.Many<Boolean> notifierSink;
public Locker() {
this.notifierSink = Sinks.many().multicast().onBackpressureBuffer(1, false);
this.notifier = notifierSink.asFlux();
this.notifierSink.emitNext(true, Sinks.EmitFailureHandler.FAIL_FAST);
}
public <T> Flux<T> lockThenProcess(Duration lockTimeout, Flux<T> job) {
return notifier.filter(v -> obtainLock())
.next()
.transform(locked -> lockTimeout == null ? locked : locked.timeout(lockTimeout))
.doOnSubscribe(s -> log.debug("obtaining lock"))
.doOnError(th -> log.error("can't obtain lock: " + th.getMessage(), th))
.flatMapMany(v -> job)
.doFinally(s -> {
if (releaseLock()) {
log.debug("released lock");
notifierSink.emitNext(true, Sinks.EmitFailureHandler.FAIL_FAST);
}
});
}
private synchronized boolean obtainLock() {
return locked.getAndSet(false);
}
private synchronized boolean releaseLock() {
locked.set(true);
return locked.get();
}
}
然后,调用 op1 和 op2(在任何线程上)如下:
op1Trigger.concatMap(v -> locker.lockThenProcess(Duration.ofMinutes(1), r.op1(input).flux()))
和
op2Trigger.concatMap(v -> locker.lockThenProcess(Duration.ofMinutes(1), r.op2(input).flux()))
我有一个具有两个非阻塞操作的资源 (r)(这是来自我无法更改的外部库):
Mono<String> op1(String someInput);
Mono<String> op2(String otherInput);
op1() 和 op2() 可以在不同的线程上调用 但不能同时执行 。何时调用 op1 取决于外部因素,op2 也是如此。两个操作是不相关的(op1 可能被调用 100 次,op2 可能被调用 1 次)。如何保证op1和op2的处理互斥。如果 op1 /op2 阻塞,java 'synchronized' 会解决它。如果同时发生 op1 和 op2 处理,外部库方法将失败。
如何在不使用 EmitProcessor(已弃用)的情况下强制执行此类同步,以便可以从不同的调度程序线程调用 op1 和 op2?还是 WebFlux 中有内置的标准解决方案 api 来解决这种情况?
(有一个使用 EventProcessor 的解决方案,但希望避免使用它,因为 EventProcessor 已被弃用 Nonblocking ReentrantLock with Reactor)
目前在 Reactor 中没有为此内置任何内容。
其他答案中的解决方案可能会更新为Sinks.Many
新API,看来相关项目确实已更新:https://github.com/alex-pumpkin/reactor-lock
您可以也使用https://github.com/reactor/reactor-pool,但这有点矫枉过正。
以下解决方案确保 op1 和 op2 的互斥处理:
public class Locker {
private final AtomicBoolean locked = new AtomicBoolean(true);
private final Flux<Boolean> notifier;
private final Sinks.Many<Boolean> notifierSink;
public Locker() {
this.notifierSink = Sinks.many().multicast().onBackpressureBuffer(1, false);
this.notifier = notifierSink.asFlux();
this.notifierSink.emitNext(true, Sinks.EmitFailureHandler.FAIL_FAST);
}
public <T> Flux<T> lockThenProcess(Duration lockTimeout, Flux<T> job) {
return notifier.filter(v -> obtainLock())
.next()
.transform(locked -> lockTimeout == null ? locked : locked.timeout(lockTimeout))
.doOnSubscribe(s -> log.debug("obtaining lock"))
.doOnError(th -> log.error("can't obtain lock: " + th.getMessage(), th))
.flatMapMany(v -> job)
.doFinally(s -> {
if (releaseLock()) {
log.debug("released lock");
notifierSink.emitNext(true, Sinks.EmitFailureHandler.FAIL_FAST);
}
});
}
private synchronized boolean obtainLock() {
return locked.getAndSet(false);
}
private synchronized boolean releaseLock() {
locked.set(true);
return locked.get();
}
}
然后,调用 op1 和 op2(在任何线程上)如下:
op1Trigger.concatMap(v -> locker.lockThenProcess(Duration.ofMinutes(1), r.op1(input).flux()))
和
op2Trigger.concatMap(v -> locker.lockThenProcess(Duration.ofMinutes(1), r.op2(input).flux()))