RxJava2 可观察背压
RxJava2 Observable backpressure
最近我意识到我不明白 RxJava2
背压是如何工作的。
我做了小测试,我希望它会失败并出现 MissingBackpressureException
异常:
@Test
public void testBackpressureWillFail() {
Observable.<Integer>create(e -> {
for (int i = 0; i < 10000; i++) {
System.out.println("Emit: " + i);
e.onNext(i);
}
e.onComplete();
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.doOnNext(i -> {
Thread.sleep(100);
System.out.println("Processed:" + i);
})
.blockingSubscribe();
}
接下来系统输出显示:
Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000
Processed:0
Processed:1
Processed:2
...
Processed:10000
为什么它不产生 MissingBackpressureException
。
我预计 e.onNext(i);
会将项目放入 ObservableObserveOn
的缓冲区,并且在它的大小大于 static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());
之后
它应该抛出 MissingBackpressureException
但实际上并没有发生。缓冲区会自动增长吗?如果不是,物品存放在哪里?
那是因为背压移到了 Flowable
只有 RxJava2,见 here。
如果您使用 BackpressureStrategy.MISSING
切换到 Flowable
,您将得到异常。
这也意味着在你的情况下你确实有自动增长的缓冲区,
来自 observerOn
文档:
Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer...
最近我意识到我不明白 RxJava2
背压是如何工作的。
我做了小测试,我希望它会失败并出现 MissingBackpressureException
异常:
@Test
public void testBackpressureWillFail() {
Observable.<Integer>create(e -> {
for (int i = 0; i < 10000; i++) {
System.out.println("Emit: " + i);
e.onNext(i);
}
e.onComplete();
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.doOnNext(i -> {
Thread.sleep(100);
System.out.println("Processed:" + i);
})
.blockingSubscribe();
}
接下来系统输出显示:
Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000
Processed:0
Processed:1
Processed:2
...
Processed:10000
为什么它不产生 MissingBackpressureException
。
我预计 e.onNext(i);
会将项目放入 ObservableObserveOn
的缓冲区,并且在它的大小大于 static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());
它应该抛出 MissingBackpressureException
但实际上并没有发生。缓冲区会自动增长吗?如果不是,物品存放在哪里?
那是因为背压移到了 Flowable
只有 RxJava2,见 here。
如果您使用 BackpressureStrategy.MISSING
切换到 Flowable
,您将得到异常。
这也意味着在你的情况下你确实有自动增长的缓冲区,
来自 observerOn
文档:
Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer...