不使用 EmitProcessor 的非阻塞锁

Non-blocking lock without using EmitProcessor

我有一个具有两个非阻塞操作的资源 (r)(这是来自我无法更改的外部库):

Mono<String> op1(String someInput);
Mono<String> op2(String otherInput);

op1() 和 op2() 可以在不同的线程上调用 但不能同时执行 。何时调用 op1 取决于外部因素,op2 也是如此。两个操作是不相关的(op1 可能被调用 100 次,op2 可能被调用 1 次)。如何保证op1和op2的处理互斥。如果 op1 /op2 阻塞,java 'synchronized' 会解决它。如果同时发生 op1 和 op2 处理,外部库方法将失败。

如何在不使用 EmitProcessor(已弃用)的情况下强制执行此类同步,以便可以从不同的调度程序线程调用 op1 和 op2?还是 WebFlux 中有内置的标准解决方案 api 来解决这种情况?

(有一个使用 EventProcessor 的解决方案,但希望避免使用它,因为 EventProcessor 已被弃用 Nonblocking ReentrantLock with Reactor

目前在 Reactor 中没有为此内置任何内容。

其他答案中的解决方案可能会更新为Sinks.Many新API,看来相关项目确实已更新:https://github.com/alex-pumpkin/reactor-lock

可以也使用https://github.com/reactor/reactor-pool,但这有点矫枉过正。

以下解决方案确保 op1 和 op2 的互斥处理:

public class Locker {
    private final AtomicBoolean locked = new AtomicBoolean(true);
    private final Flux<Boolean> notifier;
    private final Sinks.Many<Boolean> notifierSink;

    public Locker() {
        this.notifierSink = Sinks.many().multicast().onBackpressureBuffer(1, false);
        this.notifier = notifierSink.asFlux();
        this.notifierSink.emitNext(true, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    public <T> Flux<T> lockThenProcess(Duration lockTimeout, Flux<T> job) {
        return notifier.filter(v -> obtainLock())
                .next()
                .transform(locked -> lockTimeout == null ? locked : locked.timeout(lockTimeout))
                .doOnSubscribe(s -> log.debug("obtaining lock"))
                .doOnError(th -> log.error("can't obtain lock: " + th.getMessage(), th))
                .flatMapMany(v -> job)
                .doFinally(s -> {
                    if (releaseLock()) {
                        log.debug("released lock");
                        notifierSink.emitNext(true, Sinks.EmitFailureHandler.FAIL_FAST);
                    }
                });
    }

    private synchronized boolean obtainLock() {
        return locked.getAndSet(false);
    }

    private synchronized boolean releaseLock() {
        locked.set(true);
        return locked.get();
    }
}

然后,调用 op1 和 op2(在任何线程上)如下:

op1Trigger.concatMap(v -> locker.lockThenProcess(Duration.ofMinutes(1), r.op1(input).flux()))

op2Trigger.concatMap(v -> locker.lockThenProcess(Duration.ofMinutes(1), r.op2(input).flux()))