Rxjava: multible Observables,如果只有前一个失败,则执行下一个 observable

Rxjava: multible Observables, execute the next observable if only the previous failed

我有一个案例,我有多个 observable,每个 observable 都有自己的实现,它们可能具有相同的类型,或者不同的我还不确定,但我们假设它们是相同的类型.

Observable<String> source1;
Observable<String> source2;
Observable<String> source3;
Observable<String> source4;

我现在需要做的是只执行其中一个,所以如果前一个失败,stream 只会移动到下一个 observable。

一些可能的解决方案:

如何实现这样的目标,如果它们具有不同的类型,我需要做什么?

我不知道是否有更好的方法,但我会在一些方法的帮助下使用 onErrorResumeNext() 使其更灵活:

Observable<String> buildObservable(Observable<String> obs, Observable<String>... subsequentObservables) {
    Observable<String> observable = obs;
    for (int i = 0; i < subsequentObservables.length; i++) {
        observable = concatErrorObservable(observable, subsequentObservables[i]);
    }

    return observable;
}

其中 concatErrorObservable 是:

Observable<String> concatErrorObservable(Observable<String> observable, Observable<String> observable2) {
        return observable.onErrorResumeNext(observable2);
    }

所以你只需要提供 Observable 的列表给 buildObservable 方法。例如:

buildObservable(Observable.error(new Throwable("error!!")), 
    Observable.just("observable2"), 
    Observable.just("observable3"))
.subscribe(s -> Log.d(TAG, "result: " + s));

将打印 observable2(在 logcat 中),因为第一个 observable 会抛出错误。

关于不同的类型,您可能需要为每个 Observable 设置不同的 map,因为我认为您的消费者(观察者)只会期望一种类型的发射数据。

您可以像这样使用 onErrorResumeNextreduce 获得组合的可观察值:

Observable<String> buildObservable(List<Observable<String>> observables) {
    return Observable.fromIterable(observables)
            .reduce(Observable::onErrorResumeNext)
            .flatMapObservable(obs -> obs);
}

更新: 进一步解释,如果您使用列表 [o1, o2, o3] 调用该方法,则

  • fromIterable 将 return 一个 更高级别 可观察到等同于 just(o1, o2, o3)

  • reduce 将组合此可观察对象的元素,对每个元素依次调用 onErrorResumeNext(),如下所示:

    o1 -> o1.onErrorResumeNext(o2) -> o1.onErrorResumeNext(o2).onErrorResumeNext(o3), 
    

    导致静止的 "higher level" 1 元素可观察值,相当于 just(o1.onErrorResumeNext(o2).onErrorResumeNext(o3)).

  • flatMapObservable() 行将用它的唯一元素本身替换这个 1 元素可观察对象,即 o1.onErrorResumeNext(o2).onErrorResumeNext(o3)(没有 just()).

这个结果实现了你需要的回退机制。