RxJava2 中的背压

Back pressure in RxJava2

假设我在数据库中有100万行记录。然后我使用 Flowable(而不是 Observable)读取记录,然后将它们写入加密写入速度较慢的文件(写入比读取慢得多)。

如果我在写的过程中不使用"reactive pull",会不会在写到一半的时候,我的内存就被那100万条记录填满了?换句话说,是否可能出现内存不足异常?

如果是,使用 "reactive pull" 是防止该异常的唯一方法,如果我也不想使用节流等运算符

这实际上取决于 Flowable 的创建方式。例如,如果 Flowable 是使用 fromIterable 创建的,而 Iterator 仅在调用 next 时从结果集中请求新行,那么将为您处理背压。

另一方面,如果 Flowable 是用 create 创建的,那么行为将取决于提供给 createBackpressureStrategy。如果策略是 BUFFER 并且 FlowableEmitter 只是不断推送项目,则可能会发生内存不足。理想情况下,Flowable 应该使用自然处理背压的工厂方法创建,如果不可能,则使用其他 BackpressureStrategy,如 DROP 或自定义 onBackpressureXXX 运算符。如果其他一切都失败了,可以在某些外部库中创建或找到自定义 Flowable 实现。