Flowable Subscriber:request() 方法实际上做了什么?
Flowable Subscriber: What request() method does actually?
谁能说出 DisposableSubscriber
中的 request()
方法的作用以及何时使用它?我们仅在您使用 Flowable.create
创建自己的 Flowable
时使用它?官方文档说
request(long n):
Requests the specified amount from the upstream if its Subscription is set via onSubscribe already.
但我不明白这是什么意思。为了尝试,我制作了一个示例,如下所示
private Flowable<Long> streamOfNums() {
return Flowable.create(e -> {
for (int i = 0; i < 500; i++) {
e.onNext((long) i);
Log.d(TAG, "produced "+i);
}
}, BackpressureStrategy.BUFFER);
}
喜欢
streamOfNums()
.subscribeOn(Schedulers.io())
.subscribeWith(new DisposableSubscriber<Long>() {
@Override
protected void onStart() {
super.onStart();
Log.d(TAG, "onStart: ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: ");
try {
Log.d(TAG, "consuming data :"+aLong);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(4);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
我能看到的是每次发射器在给定的延迟(2000 毫秒)后产生数字。我给了 request(4)
但即使没有它,它也以完全相同的方式工作。
任何人都可以解释 request
什么时候做什么以及什么时候使用它。是否可以用于分页场景?
request
允许消费者告诉生产者要生产多少元素。默认情况下,DisposableSubscriber
在其 onStart()
方法中请求 Long.MAX_VALUE
,在这种情况下,进一步的 request()
调用无效。
很少需要在这样的最终消费者中实际调用 request
,但是当您的最终消费者充当异步边界时,您可以使用它来避免缓冲区溢出:
ExecutorService executor = Executors.newSingleThreadedExecutor();
Flowable.range(1, 500)
.doOnNext(v -> Log.d("produced: " + v))
.subscribeOn(Schedulers.io())
.subscribe(new DisposableSubscriber<Long>() {
@Override protected void onStart() {
Log.d(TAG, "onStart: "); // <----- no super.onStart() here!
request(1);
}
@Override public void onNext(Long aLong) {
executor.execute(() -> {
Log.d(TAG, "onNext: ");
try {
Log.d(TAG, "consuming data :"+aLong);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
});
}
@Override public void onError(Throwable t) {
executor.execute(() -> t.printStackTrace());
}
@Override public void onComplete() {
executor.execute(() -> Log.d("onComplete"));
}
});
Thread.sleep(100_000);
executor.shutdown();
谁能说出 DisposableSubscriber
中的 request()
方法的作用以及何时使用它?我们仅在您使用 Flowable.create
创建自己的 Flowable
时使用它?官方文档说
request(long n): Requests the specified amount from the upstream if its Subscription is set via onSubscribe already.
但我不明白这是什么意思。为了尝试,我制作了一个示例,如下所示
private Flowable<Long> streamOfNums() {
return Flowable.create(e -> {
for (int i = 0; i < 500; i++) {
e.onNext((long) i);
Log.d(TAG, "produced "+i);
}
}, BackpressureStrategy.BUFFER);
}
喜欢
streamOfNums()
.subscribeOn(Schedulers.io())
.subscribeWith(new DisposableSubscriber<Long>() {
@Override
protected void onStart() {
super.onStart();
Log.d(TAG, "onStart: ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: ");
try {
Log.d(TAG, "consuming data :"+aLong);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(4);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
我能看到的是每次发射器在给定的延迟(2000 毫秒)后产生数字。我给了 request(4)
但即使没有它,它也以完全相同的方式工作。
任何人都可以解释 request
什么时候做什么以及什么时候使用它。是否可以用于分页场景?
request
允许消费者告诉生产者要生产多少元素。默认情况下,DisposableSubscriber
在其 onStart()
方法中请求 Long.MAX_VALUE
,在这种情况下,进一步的 request()
调用无效。
很少需要在这样的最终消费者中实际调用 request
,但是当您的最终消费者充当异步边界时,您可以使用它来避免缓冲区溢出:
ExecutorService executor = Executors.newSingleThreadedExecutor();
Flowable.range(1, 500)
.doOnNext(v -> Log.d("produced: " + v))
.subscribeOn(Schedulers.io())
.subscribe(new DisposableSubscriber<Long>() {
@Override protected void onStart() {
Log.d(TAG, "onStart: "); // <----- no super.onStart() here!
request(1);
}
@Override public void onNext(Long aLong) {
executor.execute(() -> {
Log.d(TAG, "onNext: ");
try {
Log.d(TAG, "consuming data :"+aLong);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
});
}
@Override public void onError(Throwable t) {
executor.execute(() -> t.printStackTrace());
}
@Override public void onComplete() {
executor.execute(() -> Log.d("onComplete"));
}
});
Thread.sleep(100_000);
executor.shutdown();