原因:rx.exceptions.MissingBackpressureException

Caused by: rx.exceptions.MissingBackpressureException

我还有一个问题。这次我在执行这段代码的过程中遇到了这个错误Caused by: rx.exceptions.MissingBackpressureException

class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>

init {
    numberOfFileToUpdate = PublishSubject.create()
}

public fun startUpdate(): Observable<Int>{
    return getProducts().flatMap { products: ArrayList<Product> ->
            numberOfFileToUpdate.onNext(products.size)
            return@flatMap saveRows(products)
        }
}

private fun getProducts(): Observable<ArrayList<Product>> {
    return Observable.create {
        var products: ArrayList<Product> = ArrayList()
        var i = 0
        while (i++ < 100) {
            products.add(Product())
        }

        it.onNext(products)
        it.onCompleted()
    }
}


private fun saveRows(products: ArrayList<Product>): Observable<Int> {
    return Observable.create<Int> {
        var totalNumberOfRow = products.size

        while (totalNumberOfRow-- > 0){
            it.onNext(products.size - totalNumberOfRow)
            Thread.sleep(100)
        }
        it.onCompleted()
    }
}

}

代码只是两个进程的测试代码。第一个过程从网络上获取 Product 的列表,然后将这些产品保存到应用程序内的本地数据库中。这是主要思想。

方法getProducts 完成获取数据的工作,在本例中我只创建了一个包含 100 个产品的 ArrayList。 saveRows 做持久化工作。

saveRows 方法发出一个 Int 表示保存的行。我这样做是因为在 UI 我有一个报告进度的进度条。

我从应用程序的另一个点调用方法 startUpdate,在发出一些项目后,我得到描述异常

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows.call(UpdateHelper.kt:46)

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows.call(UpdateHelper.kt:40)

我理解为什么会出现此异常https://github.com/ReactiveX/RxJava/wiki/Backpressure,但我不知道我做错了什么或如何解决它。

谁能给我一些建议。

问题是您的 Observable 源发出的速度比消费者消耗的快。保存每个产品需要 100 毫秒。您可以添加 onBackpressureBuffer()。

UpdateHelper().startUpdate()
    .onBackpressureBuffer() // Add this
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
      Log.d(TAG, "next $it")
    }, {
      Log.d(TAG, it.message)
    }, {
    })

此外,您可以尝试删除 Thread.sleep(100)

flatmap 使用 OperatorMergemerge(map(func))):您可以看到,在您的情况下,map 的 onNext 发送速度比请求的要快。