rxjava2 生产者-消费者,'downstream' 请求一个,'upstream' 发出一个
rxjava2 producer-consumer, 'downstream' request one, the 'upstream' emit one
我想使用 rxjava2 实现简单的生产者-消费者模型,下游请求一个,上游发出一个。
我知道 flatMap
或 observeOn
的默认缓冲区大小为 128,所以我将缓冲区大小设置为 1,但它也不起作用。
Flowable.defer((Callable<Publisher<Integer>>) () -> Flowable.range(1, 5))
.flatMap((Function<Integer, Publisher<Integer>>) integer -> {
//do something with long time.
System.out.println("flatMap:" + integer);
return Flowable.just(integer);
}, false, 1) //=====> 1
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation(), false, 1) //=====> 2
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
//request one
s.request(1);
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
实际输出:
flatMap:1
flatMap:2
onNext:1
flatMap:3
预期输出,因为我只调用了一次s.request(1)
:
flatMap:1
onNext:1
您的观察者只请求一项,但 observeOn()
也会缓冲一项。 flatMap()
运算符本身将订阅连续的输入。
- 观察者订阅观察者链,并请求1个项目。
observeOn()
为其缓冲区请求 1 个项目。
range()
运算符发出 1.
flatMap()
收到 1,并在内部订阅 flowable,导致第一行日志。
observeOn()
为其缓冲区获取一项,然后请求另一项。
flatMap()
获取下一个项目 2。它被发出并传递给 observeOn()
缓冲区
- 观察者的
onNext()
被调用。
flatMap()
获取下一项,3.
如果你需要完美的锁步,"request one" -> "process one",那么流量控制不是实现它的方法。相反,您可能想要引入一个提供反馈循环的可观察对象,以便观察者告诉可观察对象处理下一个。
我想使用 rxjava2 实现简单的生产者-消费者模型,下游请求一个,上游发出一个。
我知道 flatMap
或 observeOn
的默认缓冲区大小为 128,所以我将缓冲区大小设置为 1,但它也不起作用。
Flowable.defer((Callable<Publisher<Integer>>) () -> Flowable.range(1, 5))
.flatMap((Function<Integer, Publisher<Integer>>) integer -> {
//do something with long time.
System.out.println("flatMap:" + integer);
return Flowable.just(integer);
}, false, 1) //=====> 1
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation(), false, 1) //=====> 2
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
//request one
s.request(1);
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
实际输出:
flatMap:1
flatMap:2
onNext:1
flatMap:3
预期输出,因为我只调用了一次s.request(1)
:
flatMap:1
onNext:1
您的观察者只请求一项,但 observeOn()
也会缓冲一项。 flatMap()
运算符本身将订阅连续的输入。
- 观察者订阅观察者链,并请求1个项目。
observeOn()
为其缓冲区请求 1 个项目。range()
运算符发出 1.flatMap()
收到 1,并在内部订阅 flowable,导致第一行日志。observeOn()
为其缓冲区获取一项,然后请求另一项。flatMap()
获取下一个项目 2。它被发出并传递给observeOn()
缓冲区- 观察者的
onNext()
被调用。 flatMap()
获取下一项,3.
如果你需要完美的锁步,"request one" -> "process one",那么流量控制不是实现它的方法。相反,您可能想要引入一个提供反馈循环的可观察对象,以便观察者告诉可观察对象处理下一个。