在 RxJava2 中使用背压将 Observable 转换为 Flowable

Converting an Observable to a Flowable with backpressure in RxJava2

我正在观察 NetworkResource 生成的线条,将其包裹在 Observable.create 中。这是代码,为简单起见,缺少 try/catch 和取消:

fun linesOf(resource: NetworkResource): Observable<String> =
        Observable.create { emitter ->
            while (!emitter.isDisposed) {
                val line = resource.readLine()
                Log.i(TAG, "Emitting: $line")
                emitter.onNext(line)
            }
        }

问题是稍后我想使用 observable.toFlowable(LATEST) 将它变成 Flowable 以增加背压以防我的消费者无法跟上,但取决于我如何做,消费者在商品 128 之后停止接收商品。

A)这样一切正常:

val resource = ...
linesOf(resource)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribe { Log.i(TAG, "Consuming: $it") }

B) 这里消费者在 128 个项目后卡住(但发射继续):

val resource = ...
linesOf(resource)
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128

在选项 A) 中一切正常,我可以看到 Emitting: ... 日志与 Consuming: ... 日志并排显示。

在选项 B) 中,我可以看到 Emitting: ... 日志消息愉快地发出新行,但我在第 128 项之后不再看到 Consuming: ... 日志消息,即使发射仍在继续。

问题:谁能帮我理解为什么会这样?

首先,您使用了错误的类型和错误的运算符。使用 Flowable 消除了转换的需要。使用 Flowable.generate 会产生背压:

Flowable.generate(emitter -> {
    String line = resource.readLine();
    if (line == null) {
        emitter.onComplete();
    } else {
        emitter.onNext(line);
    }
});

其次,您的版本挂起的原因是由于 subscribeOn 导致的同一个池死锁。来自下游的请求被安排在您急切的发射循环之后并且无法生效,在默认的 128 个元素处停止发射。使用 Flowable.subscribeOn(scheduler, false) 来避免这种情况。