当可观察项通过它时锁定运算符链

lock an operator chain while an observable item is passing through it

我有一个 Observable 源和一个将源转换为目标类型的运算符链。通常每个源项目最多生产一个目标。

Source -> Operator chain -> Target

运算符逻辑有点复杂,涉及多个使用 IO 调度程序的异步数据库调用。我在这里省略了细节,因为它似乎不相关。 我看到的是,新的 Observables 来自 Source,而之前的 Observables 仍在由链处理。所以它类似于某种管道。这在很多情况下可能是一件好事,但在我的情况下却不是。

所以我正在寻找一种方法来延迟源项目进入链(有效地锁定它)直到前一个项目到达目标。是否有任何已知的执行此操作的模式?

我看到的一个丑陋的解决方案是在链的开头使用这样的东西:

zip(source, signal, (source, signal)->source)

其中信号是一个自定义可观察对象,用于在每次链准备好接受新的源项目时推送通知(最初是一个通知,当正在处理的项目到达链的末端时) 但我觉得它有点hacky。能否更优雅地实现或使用一组标准运算符?

这是重现我不想要的行为的合成示例。 源是 100ms 间隔定时器。 运算符链是一个缓慢的(比源代码慢 10 倍)异步调用,它在 Schedulers.io() 上计算一个正方形 目标项实际上是源的平方。

Subscription s = Observable.timer(100, 100, TimeUnit.MILLISECONDS)
    .doOnNext(source->System.out.println("source: " + source))
    .concatMap(source->Observable.create(subscr->{
      Schedulers.io().createWorker().schedule(()->{
        subscr.onNext(source * source);
        subscr.onCompleted();
      }, 1000, TimeUnit.MILLISECONDS);
    }))
    .doOnNext(target->System.out.println("target: " + target))
    .subscribe();
Thread.sleep(10000);
s.unsubscribe();

源和目标都打印出来了:

source: 0
source: 1
source: 2
source: 3
source: 4
source: 5
source: 6
source: 7
source: 8
source: 9
source: 10
source: 11
target: 0
source: 12
source: 13
source: 14
source: 15
source: 16
source: 17
source: 18
source: 19
source: 20
target: 1
source: 21
source: 22
source: 23
source: 24
source: 25
source: 26
source: 27
source: 28
source: 29
source: 30
source: 31
target: 4
source: 32
source: 33

但是我想实现的是:

source: 0
target: 0
source: 1
target: 1
source: 2
target: 4
...

根据您的源类型,这可以通过将 flatMap 参数化为 maxConcurrency = 1:

来实现
Observable.interval(100, 100, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.doOnNext(source -> System.out.println("source: " + source))
.flatMap(source -> 
     Observable.just(source)
     .map(v -> v * v)
     .delay(1, TimeUnit.SECONDS), 1)
.doOnNext(target->System.out.println("target: " + target))
.subscribe();
Thread.sleep(10000);

此解决方案涉及缓冲,但如果源很热,您可能需要选择不同的背压策略。

与要求没有严格关系,但我想指出你的这个模式:

Schedulers.io().createWorker().schedule(()->{
    subscr.onNext(source * source);
    subscr.onCompleted();
  }, 1000, TimeUnit.MILLISECONDS);

泄漏工作人员并将用不可重用的线程填满您的系统。如果你真的想通过一个 ˙Worker 延迟事件,你应该捕获并取消订阅工作实例:

Scheduler.Worker w = Schedulers.io().createWorker();
subscr.add(w);
w.schedule(() -> {
    try {
        subscr.onNext(source * source);
        subscr.onCompleted();
    } finally {
        w.unsubscribe();
    }
}, 1000, TimeUnit.MILLISECONDS);