像运算符一样模仿 `reduceWhile`

mimic `reduceWhile` like operator

我有很多值需要 'merge' 直到它们满足特定条件。 我不想使用外部变量作为缓冲区来检查我的条件是否满足,但我找不到执行 'reduceWhile' 或 'bufferWhile' 操作的方法,因为我无权访问缓冲区(使用 .buffer) 检查我是否有足够的数据将结果传递到下游。

我有:

                ohlcIntfFlux
                        .reduce(OHLCIntf::mergeWith)
                        .filter(timeFrameProvider::isBarComplete)
                        .map(this::makeBar)
                        .subscribe(pub::next)

但这会减少整个流,然后检查过滤器。

我需要减少直到满足过滤器中的条件,然后将其传递到下游。

希望它足够清楚...

谢谢!

我是这样做的:

ConnectableFlux.create(pub ->
                ohlcIntfFlux
                        .scan((i, acc) -> {
                            if (!timeFrameProvider.isBarComplete(acc))
                                return acc.mergeWith(i);

                            pub.next(makeBar(acc));
                            return i;
                        })
                        .subscribe()
        )

它可以满足我的要求,但如果您能提出更实用的方法,我将很高兴听到