Flowable Subscriber:request() 方法实际上做了什么?

Flowable Subscriber: What request() method does actually?

谁能说出 DisposableSubscriber 中的 request() 方法的作用以及何时使用它?我们仅在您使用 Flowable.create 创建自己的 Flowable 时使用它?官方文档说

request(long n): Requests the specified amount from the upstream if its Subscription is set via onSubscribe already.

但我不明白这是什么意思。为了尝试,我制作了一个示例,如下所示

private Flowable<Long> streamOfNums() {
    return Flowable.create(e -> {
        for (int i = 0; i < 500; i++) {
            e.onNext((long) i);
            Log.d(TAG, "produced "+i);
        }
    }, BackpressureStrategy.BUFFER);
}

喜欢

        streamOfNums()
            .subscribeOn(Schedulers.io())
            .subscribeWith(new DisposableSubscriber<Long>() {

                @Override
                protected void onStart() {
                    super.onStart();
                    Log.d(TAG, "onStart: ");
                }

                @Override
                public void onNext(Long aLong) {
                    Log.d(TAG, "onNext: ");
                    try {
                        Log.d(TAG, "consuming data :"+aLong);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    request(4);
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {

                }
            });

我能看到的是每次发射器在给定的延迟(2000 毫秒)后产生数字。我给了 request(4) 但即使没有它,它也以完全相同的方式工作。 任何人都可以解释 request 什么时候做什么以及什么时候使用它。是否可以用于分页场景?

request 允许消费者告诉生产者要生产多少元素。默认情况下,DisposableSubscriber 在其 onStart() 方法中请求 Long.MAX_VALUE,在这种情况下,进一步的 request() 调用无效。

很少需要在这样的最终消费者中实际调用 request,但是当您的最终消费者充当异步边界时,您可以使用它来避免缓冲区溢出:

ExecutorService executor = Executors.newSingleThreadedExecutor();

Flowable.range(1, 500)
    .doOnNext(v -> Log.d("produced: " + v))
    .subscribeOn(Schedulers.io())
    .subscribe(new DisposableSubscriber<Long>() {
            @Override protected void onStart() {
                Log.d(TAG, "onStart: "); // <----- no super.onStart() here!
                request(1);
            }
            @Override public void onNext(Long aLong) {
                executor.execute(() -> {
                    Log.d(TAG, "onNext: ");
                    try {
                        Log.d(TAG, "consuming data :"+aLong);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    request(1);
                });
            }
            @Override public void onError(Throwable t) {
                 executor.execute(() -> t.printStackTrace());
            }

            @Override public void onComplete() {
                 executor.execute(() -> Log.d("onComplete"));
            }
        });

Thread.sleep(100_000);
executor.shutdown();