在 RxJava2 中使用背压将 Observable 转换为 Flowable
Converting an Observable to a Flowable with backpressure in RxJava2
我正在观察 NetworkResource
生成的线条,将其包裹在 Observable.create
中。这是代码,为简单起见,缺少 try/catch 和取消:
fun linesOf(resource: NetworkResource): Observable<String> =
Observable.create { emitter ->
while (!emitter.isDisposed) {
val line = resource.readLine()
Log.i(TAG, "Emitting: $line")
emitter.onNext(line)
}
}
问题是稍后我想使用 observable.toFlowable(LATEST)
将它变成 Flowable
以增加背压以防我的消费者无法跟上,但取决于我如何做,消费者在商品 128 之后停止接收商品。
A)这样一切正常:
val resource = ...
linesOf(resource)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.toFlowable(BackpressureStrategy.LATEST)
.subscribe { Log.i(TAG, "Consuming: $it") }
B) 这里消费者在 128 个项目后卡住(但发射继续):
val resource = ...
linesOf(resource)
.toFlowable(BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128
在选项 A) 中一切正常,我可以看到 Emitting: ...
日志与 Consuming: ...
日志并排显示。
在选项 B) 中,我可以看到 Emitting: ...
日志消息愉快地发出新行,但我在第 128 项之后不再看到 Consuming: ...
日志消息,即使发射仍在继续。
问题:谁能帮我理解为什么会这样?
首先,您使用了错误的类型和错误的运算符。使用 Flowable
消除了转换的需要。使用 Flowable.generate
会产生背压:
Flowable.generate(emitter -> {
String line = resource.readLine();
if (line == null) {
emitter.onComplete();
} else {
emitter.onNext(line);
}
});
其次,您的版本挂起的原因是由于 subscribeOn
导致的同一个池死锁。来自下游的请求被安排在您急切的发射循环之后并且无法生效,在默认的 128 个元素处停止发射。使用 Flowable.subscribeOn(scheduler, false)
来避免这种情况。
我正在观察 NetworkResource
生成的线条,将其包裹在 Observable.create
中。这是代码,为简单起见,缺少 try/catch 和取消:
fun linesOf(resource: NetworkResource): Observable<String> =
Observable.create { emitter ->
while (!emitter.isDisposed) {
val line = resource.readLine()
Log.i(TAG, "Emitting: $line")
emitter.onNext(line)
}
}
问题是稍后我想使用 observable.toFlowable(LATEST)
将它变成 Flowable
以增加背压以防我的消费者无法跟上,但取决于我如何做,消费者在商品 128 之后停止接收商品。
A)这样一切正常:
val resource = ...
linesOf(resource)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.toFlowable(BackpressureStrategy.LATEST)
.subscribe { Log.i(TAG, "Consuming: $it") }
B) 这里消费者在 128 个项目后卡住(但发射继续):
val resource = ...
linesOf(resource)
.toFlowable(BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128
在选项 A) 中一切正常,我可以看到 Emitting: ...
日志与 Consuming: ...
日志并排显示。
在选项 B) 中,我可以看到 Emitting: ...
日志消息愉快地发出新行,但我在第 128 项之后不再看到 Consuming: ...
日志消息,即使发射仍在继续。
问题:谁能帮我理解为什么会这样?
首先,您使用了错误的类型和错误的运算符。使用 Flowable
消除了转换的需要。使用 Flowable.generate
会产生背压:
Flowable.generate(emitter -> {
String line = resource.readLine();
if (line == null) {
emitter.onComplete();
} else {
emitter.onNext(line);
}
});
其次,您的版本挂起的原因是由于 subscribeOn
导致的同一个池死锁。来自下游的请求被安排在您急切的发射循环之后并且无法生效,在默认的 128 个元素处停止发射。使用 Flowable.subscribeOn(scheduler, false)
来避免这种情况。