当可观察项通过它时锁定运算符链
lock an operator chain while an observable item is passing through it
我有一个 Observable 源和一个将源转换为目标类型的运算符链。通常每个源项目最多生产一个目标。
Source -> Operator chain -> Target
运算符逻辑有点复杂,涉及多个使用 IO 调度程序的异步数据库调用。我在这里省略了细节,因为它似乎不相关。
我看到的是,新的 Observables 来自 Source,而之前的 Observables 仍在由链处理。所以它类似于某种管道。这在很多情况下可能是一件好事,但在我的情况下却不是。
所以我正在寻找一种方法来延迟源项目进入链(有效地锁定它)直到前一个项目到达目标。是否有任何已知的执行此操作的模式?
我看到的一个丑陋的解决方案是在链的开头使用这样的东西:
zip(source, signal, (source, signal)->source)
其中信号是一个自定义可观察对象,用于在每次链准备好接受新的源项目时推送通知(最初是一个通知,当正在处理的项目到达链的末端时)
但我觉得它有点hacky。能否更优雅地实现或使用一组标准运算符?
这是重现我不想要的行为的合成示例。
源是 100ms 间隔定时器。
运算符链是一个缓慢的(比源代码慢 10 倍)异步调用,它在 Schedulers.io() 上计算一个正方形
目标项实际上是源的平方。
Subscription s = Observable.timer(100, 100, TimeUnit.MILLISECONDS)
.doOnNext(source->System.out.println("source: " + source))
.concatMap(source->Observable.create(subscr->{
Schedulers.io().createWorker().schedule(()->{
subscr.onNext(source * source);
subscr.onCompleted();
}, 1000, TimeUnit.MILLISECONDS);
}))
.doOnNext(target->System.out.println("target: " + target))
.subscribe();
Thread.sleep(10000);
s.unsubscribe();
源和目标都打印出来了:
source: 0
source: 1
source: 2
source: 3
source: 4
source: 5
source: 6
source: 7
source: 8
source: 9
source: 10
source: 11
target: 0
source: 12
source: 13
source: 14
source: 15
source: 16
source: 17
source: 18
source: 19
source: 20
target: 1
source: 21
source: 22
source: 23
source: 24
source: 25
source: 26
source: 27
source: 28
source: 29
source: 30
source: 31
target: 4
source: 32
source: 33
但是我想实现的是:
source: 0
target: 0
source: 1
target: 1
source: 2
target: 4
...
根据您的源类型,这可以通过将 flatMap
参数化为 maxConcurrency = 1
:
来实现
Observable.interval(100, 100, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.doOnNext(source -> System.out.println("source: " + source))
.flatMap(source ->
Observable.just(source)
.map(v -> v * v)
.delay(1, TimeUnit.SECONDS), 1)
.doOnNext(target->System.out.println("target: " + target))
.subscribe();
Thread.sleep(10000);
此解决方案涉及缓冲,但如果源很热,您可能需要选择不同的背压策略。
与要求没有严格关系,但我想指出你的这个模式:
Schedulers.io().createWorker().schedule(()->{
subscr.onNext(source * source);
subscr.onCompleted();
}, 1000, TimeUnit.MILLISECONDS);
泄漏工作人员并将用不可重用的线程填满您的系统。如果你真的想通过一个 ˙Worker 延迟事件,你应该捕获并取消订阅工作实例:
Scheduler.Worker w = Schedulers.io().createWorker();
subscr.add(w);
w.schedule(() -> {
try {
subscr.onNext(source * source);
subscr.onCompleted();
} finally {
w.unsubscribe();
}
}, 1000, TimeUnit.MILLISECONDS);
我有一个 Observable 源和一个将源转换为目标类型的运算符链。通常每个源项目最多生产一个目标。
Source -> Operator chain -> Target
运算符逻辑有点复杂,涉及多个使用 IO 调度程序的异步数据库调用。我在这里省略了细节,因为它似乎不相关。 我看到的是,新的 Observables 来自 Source,而之前的 Observables 仍在由链处理。所以它类似于某种管道。这在很多情况下可能是一件好事,但在我的情况下却不是。
所以我正在寻找一种方法来延迟源项目进入链(有效地锁定它)直到前一个项目到达目标。是否有任何已知的执行此操作的模式?
我看到的一个丑陋的解决方案是在链的开头使用这样的东西:
zip(source, signal, (source, signal)->source)
其中信号是一个自定义可观察对象,用于在每次链准备好接受新的源项目时推送通知(最初是一个通知,当正在处理的项目到达链的末端时) 但我觉得它有点hacky。能否更优雅地实现或使用一组标准运算符?
这是重现我不想要的行为的合成示例。 源是 100ms 间隔定时器。 运算符链是一个缓慢的(比源代码慢 10 倍)异步调用,它在 Schedulers.io() 上计算一个正方形 目标项实际上是源的平方。
Subscription s = Observable.timer(100, 100, TimeUnit.MILLISECONDS)
.doOnNext(source->System.out.println("source: " + source))
.concatMap(source->Observable.create(subscr->{
Schedulers.io().createWorker().schedule(()->{
subscr.onNext(source * source);
subscr.onCompleted();
}, 1000, TimeUnit.MILLISECONDS);
}))
.doOnNext(target->System.out.println("target: " + target))
.subscribe();
Thread.sleep(10000);
s.unsubscribe();
源和目标都打印出来了:
source: 0
source: 1
source: 2
source: 3
source: 4
source: 5
source: 6
source: 7
source: 8
source: 9
source: 10
source: 11
target: 0
source: 12
source: 13
source: 14
source: 15
source: 16
source: 17
source: 18
source: 19
source: 20
target: 1
source: 21
source: 22
source: 23
source: 24
source: 25
source: 26
source: 27
source: 28
source: 29
source: 30
source: 31
target: 4
source: 32
source: 33
但是我想实现的是:
source: 0
target: 0
source: 1
target: 1
source: 2
target: 4
...
根据您的源类型,这可以通过将 flatMap
参数化为 maxConcurrency = 1
:
Observable.interval(100, 100, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.doOnNext(source -> System.out.println("source: " + source))
.flatMap(source ->
Observable.just(source)
.map(v -> v * v)
.delay(1, TimeUnit.SECONDS), 1)
.doOnNext(target->System.out.println("target: " + target))
.subscribe();
Thread.sleep(10000);
此解决方案涉及缓冲,但如果源很热,您可能需要选择不同的背压策略。
与要求没有严格关系,但我想指出你的这个模式:
Schedulers.io().createWorker().schedule(()->{
subscr.onNext(source * source);
subscr.onCompleted();
}, 1000, TimeUnit.MILLISECONDS);
泄漏工作人员并将用不可重用的线程填满您的系统。如果你真的想通过一个 ˙Worker 延迟事件,你应该捕获并取消订阅工作实例:
Scheduler.Worker w = Schedulers.io().createWorker();
subscr.add(w);
w.schedule(() -> {
try {
subscr.onNext(source * source);
subscr.onCompleted();
} finally {
w.unsubscribe();
}
}, 1000, TimeUnit.MILLISECONDS);