使用一个 Observable 为另一个 Observable 计时
Using one Observable to clock another Observable
我们需要 "clock" 一个基于另一个的热 observable,例如:
main: --(1)-----(2)-----(3)-----(4)-------(5)----(6)------------|
clock: -------(X)------------(X)------(X)--------------(X)-------|
______________________________________________________________________________
expected result: -------(1)(2)-----(3)-(3)-(4)--(4)--(5)----(6)--(6)-------|
更具体地说,每当 "clock" 发出一个项目时,另一个 observable 的最后一个项目就会再次发出。项目发射被延迟,直到两个可观察对象都发射了至少一个项目。
现在,我们通过以下方式实现这一点:
<T, U> Observable<T> clock(Observable<T> main, Observable<U> clock) {
return Observable.combineLatest(main, clock, (mainItem, clockItem) -> mainItem);
}
这看起来有点傻,因为我们正在应用一个转换,然后只丢弃其中一个输入。此外,我们偶尔会遇到这种解决方案的背压问题。
似乎会有一个现有的 Rx 运算符执行此操作,但到目前为止我还没有找到正确的 API 方法来执行此操作。
有没有更好、更惯用的方法来用 RxJava 解决这个问题?
我觉得不错。这并不是说 clock
输入没有被使用,因为虽然这个值没有被使用,但时间已经被使用了。
为了处理热源上的背压,您可能需要应用 onBackpressureXXX
运算符之一。例如,如果您不想错过输出,则使用 .onBackpressureBuffer()
。另一种策略是使用 .throttle
或 .sample
.
我们需要 "clock" 一个基于另一个的热 observable,例如:
main: --(1)-----(2)-----(3)-----(4)-------(5)----(6)------------|
clock: -------(X)------------(X)------(X)--------------(X)-------|
______________________________________________________________________________
expected result: -------(1)(2)-----(3)-(3)-(4)--(4)--(5)----(6)--(6)-------|
更具体地说,每当 "clock" 发出一个项目时,另一个 observable 的最后一个项目就会再次发出。项目发射被延迟,直到两个可观察对象都发射了至少一个项目。 现在,我们通过以下方式实现这一点:
<T, U> Observable<T> clock(Observable<T> main, Observable<U> clock) {
return Observable.combineLatest(main, clock, (mainItem, clockItem) -> mainItem);
}
这看起来有点傻,因为我们正在应用一个转换,然后只丢弃其中一个输入。此外,我们偶尔会遇到这种解决方案的背压问题。 似乎会有一个现有的 Rx 运算符执行此操作,但到目前为止我还没有找到正确的 API 方法来执行此操作。 有没有更好、更惯用的方法来用 RxJava 解决这个问题?
我觉得不错。这并不是说 clock
输入没有被使用,因为虽然这个值没有被使用,但时间已经被使用了。
为了处理热源上的背压,您可能需要应用 onBackpressureXXX
运算符之一。例如,如果您不想错过输出,则使用 .onBackpressureBuffer()
。另一种策略是使用 .throttle
或 .sample
.