如何在 RxJava2 中批处理链中的 Flowable 输出

How to batch process a Flowable output in a chain in RxJava2

如何在 RxJava2 中将要批量处理的进程链接起来。下面的流程图是我想要实现的。

Flowable#1          Flowable#2 (process every 10)
==============     ================================
callServer(p1) ->
      :        ->  saveToDatabase(List<r1 to r10>)
callServer(p20)->  saveToDatabase(List<r11 to r20>)
callServer(p21)->      :
      :                :
callServer(p35)->  saveToDatabase(List<r31 to r35>) //the remainder

目前,我要做的是等待所有结果返回后再保存到数据库中。

Flowable.fromIterable(paramList)
    .map(p -> callServer(p)) 
    //wait for the return a map of ALL the results r  
    //how to chain it such that saveToDatabase process after 'n' results 
    .toList()  
    .flatmap(listOfR -> saveToDatabase(listOfR); 

如何做到在每个 'n' 结果后调用 saveToDatabase 而不是等待所有结果完成?

使用带有 n 作为参数的 buffer() 运算符,缓冲区将从源中收集 n 项 Observable 并发出包含 n 项的列表。
所以你可以一次处理每n个项目,并将它们保存到数据库