Rxjava: multible Observables,如果只有前一个失败,则执行下一个 observable
Rxjava: multible Observables, execute the next observable if only the previous failed
我有一个案例,我有多个 observable,每个 observable 都有自己的实现,它们可能具有相同的类型,或者不同的我还不确定,但我们假设它们是相同的类型.
Observable<String> source1;
Observable<String> source2;
Observable<String> source3;
Observable<String> source4;
我现在需要做的是只执行其中一个,所以如果前一个失败,stream 只会移动到下一个 observable。
一些可能的解决方案:
onErrorResumeNext() 如果只有两个可能会更好
observable,但在我这里,如果我需要更改执行顺序,则很难更新每个 observable。
有 combineLatest 但我不知道它是否像我一样工作
描述,或根据我的需要进行哪些修改。
如何实现这样的目标,如果它们具有不同的类型,我需要做什么?
我不知道是否有更好的方法,但我会在一些方法的帮助下使用 onErrorResumeNext()
使其更灵活:
Observable<String> buildObservable(Observable<String> obs, Observable<String>... subsequentObservables) {
Observable<String> observable = obs;
for (int i = 0; i < subsequentObservables.length; i++) {
observable = concatErrorObservable(observable, subsequentObservables[i]);
}
return observable;
}
其中 concatErrorObservable
是:
Observable<String> concatErrorObservable(Observable<String> observable, Observable<String> observable2) {
return observable.onErrorResumeNext(observable2);
}
所以你只需要提供 Observable
的列表给 buildObservable
方法。例如:
buildObservable(Observable.error(new Throwable("error!!")),
Observable.just("observable2"),
Observable.just("observable3"))
.subscribe(s -> Log.d(TAG, "result: " + s));
将打印 observable2
(在 logcat 中),因为第一个 observable 会抛出错误。
关于不同的类型,您可能需要为每个 Observable
设置不同的 map
,因为我认为您的消费者(观察者)只会期望一种类型的发射数据。
您可以像这样使用 onErrorResumeNext
和 reduce
获得组合的可观察值:
Observable<String> buildObservable(List<Observable<String>> observables) {
return Observable.fromIterable(observables)
.reduce(Observable::onErrorResumeNext)
.flatMapObservable(obs -> obs);
}
更新:
进一步解释,如果您使用列表 [o1, o2, o3]
调用该方法,则
fromIterable
将 return 一个 更高级别 可观察到等同于 just(o1, o2, o3)
reduce
将组合此可观察对象的元素,对每个元素依次调用 onErrorResumeNext()
,如下所示:
o1 -> o1.onErrorResumeNext(o2) -> o1.onErrorResumeNext(o2).onErrorResumeNext(o3),
导致静止的 "higher level" 1 元素可观察值,相当于 just(o1.onErrorResumeNext(o2).onErrorResumeNext(o3))
.
flatMapObservable()
行将用它的唯一元素本身替换这个 1 元素可观察对象,即 o1.onErrorResumeNext(o2).onErrorResumeNext(o3)
(没有 just()
).
这个结果实现了你需要的回退机制。
我有一个案例,我有多个 observable,每个 observable 都有自己的实现,它们可能具有相同的类型,或者不同的我还不确定,但我们假设它们是相同的类型.
Observable<String> source1;
Observable<String> source2;
Observable<String> source3;
Observable<String> source4;
我现在需要做的是只执行其中一个,所以如果前一个失败,stream 只会移动到下一个 observable。
一些可能的解决方案:
onErrorResumeNext() 如果只有两个可能会更好 observable,但在我这里,如果我需要更改执行顺序,则很难更新每个 observable。
有 combineLatest 但我不知道它是否像我一样工作 描述,或根据我的需要进行哪些修改。
如何实现这样的目标,如果它们具有不同的类型,我需要做什么?
我不知道是否有更好的方法,但我会在一些方法的帮助下使用 onErrorResumeNext()
使其更灵活:
Observable<String> buildObservable(Observable<String> obs, Observable<String>... subsequentObservables) {
Observable<String> observable = obs;
for (int i = 0; i < subsequentObservables.length; i++) {
observable = concatErrorObservable(observable, subsequentObservables[i]);
}
return observable;
}
其中 concatErrorObservable
是:
Observable<String> concatErrorObservable(Observable<String> observable, Observable<String> observable2) {
return observable.onErrorResumeNext(observable2);
}
所以你只需要提供 Observable
的列表给 buildObservable
方法。例如:
buildObservable(Observable.error(new Throwable("error!!")),
Observable.just("observable2"),
Observable.just("observable3"))
.subscribe(s -> Log.d(TAG, "result: " + s));
将打印 observable2
(在 logcat 中),因为第一个 observable 会抛出错误。
关于不同的类型,您可能需要为每个 Observable
设置不同的 map
,因为我认为您的消费者(观察者)只会期望一种类型的发射数据。
您可以像这样使用 onErrorResumeNext
和 reduce
获得组合的可观察值:
Observable<String> buildObservable(List<Observable<String>> observables) {
return Observable.fromIterable(observables)
.reduce(Observable::onErrorResumeNext)
.flatMapObservable(obs -> obs);
}
更新:
进一步解释,如果您使用列表 [o1, o2, o3]
调用该方法,则
fromIterable
将 return 一个 更高级别 可观察到等同于just(o1, o2, o3)
reduce
将组合此可观察对象的元素,对每个元素依次调用onErrorResumeNext()
,如下所示:o1 -> o1.onErrorResumeNext(o2) -> o1.onErrorResumeNext(o2).onErrorResumeNext(o3),
导致静止的 "higher level" 1 元素可观察值,相当于
just(o1.onErrorResumeNext(o2).onErrorResumeNext(o3))
.flatMapObservable()
行将用它的唯一元素本身替换这个 1 元素可观察对象,即o1.onErrorResumeNext(o2).onErrorResumeNext(o3)
(没有just()
).
这个结果实现了你需要的回退机制。