像运算符一样模仿 `reduceWhile`
mimic `reduceWhile` like operator
我有很多值需要 'merge' 直到它们满足特定条件。
我不想使用外部变量作为缓冲区来检查我的条件是否满足,但我找不到执行 'reduceWhile' 或 'bufferWhile' 操作的方法,因为我无权访问缓冲区(使用 .buffer
) 检查我是否有足够的数据将结果传递到下游。
我有:
ohlcIntfFlux
.reduce(OHLCIntf::mergeWith)
.filter(timeFrameProvider::isBarComplete)
.map(this::makeBar)
.subscribe(pub::next)
但这会减少整个流,然后检查过滤器。
我需要减少直到满足过滤器中的条件,然后将其传递到下游。
希望它足够清楚...
谢谢!
我是这样做的:
ConnectableFlux.create(pub ->
ohlcIntfFlux
.scan((i, acc) -> {
if (!timeFrameProvider.isBarComplete(acc))
return acc.mergeWith(i);
pub.next(makeBar(acc));
return i;
})
.subscribe()
)
它可以满足我的要求,但如果您能提出更实用的方法,我将很高兴听到
我有很多值需要 'merge' 直到它们满足特定条件。
我不想使用外部变量作为缓冲区来检查我的条件是否满足,但我找不到执行 'reduceWhile' 或 'bufferWhile' 操作的方法,因为我无权访问缓冲区(使用 .buffer
) 检查我是否有足够的数据将结果传递到下游。
我有:
ohlcIntfFlux
.reduce(OHLCIntf::mergeWith)
.filter(timeFrameProvider::isBarComplete)
.map(this::makeBar)
.subscribe(pub::next)
但这会减少整个流,然后检查过滤器。
我需要减少直到满足过滤器中的条件,然后将其传递到下游。
希望它足够清楚...
谢谢!
我是这样做的:
ConnectableFlux.create(pub ->
ohlcIntfFlux
.scan((i, acc) -> {
if (!timeFrameProvider.isBarComplete(acc))
return acc.mergeWith(i);
pub.next(makeBar(acc));
return i;
})
.subscribe()
)
它可以满足我的要求,但如果您能提出更实用的方法,我将很高兴听到