Reactive Java:根据两个项目的比较在流中插入项目

Reactive Java: Insert item in stream based on comparison of two items

如果通过比较流中的两个项目满足条件,我想在流中插入项目。流表示Observable<Object>

图中看起来像

10 -> 20 -> 40 -> 50 -> 70 -> 90 (input stream)
|if two items are more than 10 apart, insert item that is 10 times the previous item in comparison, both comparison items are always emitted|
10 -> 20 -> 200 -> 40 -> 50 -> 500 -> 70 -> 700 -> 90 (output stream)

以上只是一个例子。实际上,我想要一个从上一个项目映射而来的通用插入项目。

谢谢。

在 Rx 中,通常的做法是使用 flatMap/concatMap and composing observables. In your case you also need to use scan 或 zipWith 来访问当前元素和上一个元素(你也可以使用状态,但这不是 Rx-y ):

sourceObs.startWith(0).zip(sourceObs, (prev, next) ->
    next > prev+10 ?
        Observable.just(prev*10, next) :
        Observable.just(next))
    .concatMap(o -> o);