RxJava 将 Observable 与另一个可选的带超时的 Observable 结合起来

RxJava combine Observable with another optional Observable with timeout

假设我们有两个可观测值 ABA 肯定会发布结果,而 B 的结果可能根本不会发布(超时)。

问题是如何映射 AB 的结果,如果 B returns 在一个时间范围内,否则只是 return 结果来自 A.

Observable<DatabaseObject> A = getDatabaseElement();
Observable<NetworkObject> B = restApi.getElement();

地图示例:

map((databaseObject, networkObject) => {
  databaseObject.setData(networkObject);
  return databaseObject;
})

为了超时 B observable 使用带有时间参数的 take 运算符:

B.take(10, TimeUnit.SECONDS)

为了接收 AB(如果 B 在超时内就绪)使用 concatWith:

A.concatWith(B.take(10, TimeUnit.SECONDS))
    .takeLast(1)

如果您希望合并 AB(可选择使用 B 丰富 A):

A.concatWith(B.take(10, TimeUnit.SECONDS))
    .reduce((a, b) -> a.setData(b))

如果 AB 是不同的类型(可选地用 B 丰富 A):

Observable.combineLatest(
    A,
    B.take(10, TimeUnit.SECONDS).defaultIfEmpty(stubB)),
    (a, b) -> { if (b != stubB) a.setData(b); }
)

您可能想在这里采用不同的方法。有一个运算符专门针对您描述的行为而设计。看一下 .timeout(long time, TimeUnits units, Observable backupObservable),当原始流中没有指定数量的其他项目时,它指示流从原始可观察对象(在你的情况 B 中,我假设的网络请求)切换到备份 A 可观察对象的时间。