rxjava2 生产者-消费者,'downstream' 请求一个,'upstream' 发出一个

rxjava2 producer-consumer, 'downstream' request one, the 'upstream' emit one

我想使用 rxjava2 实现简单的生产者-消费者模型,下游请求一个,上游发出一个。

我知道 flatMapobserveOn 的默认缓冲区大小为 128,所以我将缓冲区大小设置为 1,但它也不起作用。

Flowable.defer((Callable<Publisher<Integer>>) () -> Flowable.range(1, 5))
            .flatMap((Function<Integer, Publisher<Integer>>) integer -> {
                //do something with long time.
                System.out.println("flatMap:" + integer);
                return Flowable.just(integer);
            }, false, 1) //=====> 1
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation(), false, 1) //=====> 2
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    //request one
                    s.request(1);
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("onNext:" + integer);
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {

                }
            });

实际输出:

flatMap:1
flatMap:2
onNext:1
flatMap:3

预期输出,因为我只调用了一次s.request(1)

flatMap:1
onNext:1

您的观察者只请求一项,但 observeOn() 也会缓冲一项。 flatMap() 运算符本身将订阅连续的输入。

  1. 观察者订阅观察者链,并请求1个项目。
  2. observeOn() 为其缓冲区请求 1 个项目。
  3. range() 运算符发出 1.
  4. flatMap() 收到 1,并在内部订阅 flowable,导致第一行日志。
  5. observeOn() 为其缓冲区获取一项,然后请求另一项。
  6. flatMap() 获取下一个项目 2。它被发出并传递给 observeOn() 缓冲区
  7. 观察者的onNext()被调用。
  8. flatMap() 获取下一项,3.

如果你需要完美的锁步,"request one" -> "process one",那么流量控制不是实现它的方法。相反,您可能想要引入一个提供反馈循环的可观察对象,以便观察者告诉可观察对象处理下一个。