RxJava:如何处理 zip 运算符的错误?

RxJava : How to handle error with zip operator ?

我正在使用带有 Retrofit2 的 RxJava 和 RxAndroid。

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

在上面两个Observer 上使用如下zip 运算符。

 Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() {
            @Override
                public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) {
                  ArrayList<TestData> testDataList = new ArrayList();
                      // Add test data from response responseOne & responseTwo
                  return testDataList;
            } 
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<ArrayList<TestData>>() {

        @Override
        public void onNext(ArrayList<TestData> testDataList) {

        }

        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted" );
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "onError Throwable: " + t.toString() );
        }
    });

如果在responseOneObservableresponseTwoObservable中retrofit http调用有错误,则直接调用testDataObservable订阅者的onError方法。[=18] =]

我想继续使用 zip 运算符的 call 方法,即使两个可观察对象中的任何一个都获得了成功响应。

如何使用 zip 运算符处理错误响应?

您应该在单个压缩的可观察对象上使用 onErrorResumeNext 来指示它们在出现错误时发出默认项。

参考Error-Handling-Operators

您可以使用 onErrorResumeNext 运算符 return 来自两个可观察对象之一的默认响应。

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
    .onErrorResumeNext(throwable -> {/*some default value*/})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
    .onErrorResumeNext(throwable -> {/*some default value*/})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

另见 Error handling in RxJava - Dan Lew

你可以使用 onErrorResumeNext 到 return 一些 Observable 或者 onErrorReturn 到 return 一些默认值到 zip,比如:

Observable.zip(
   responseOneObservable
       .onErrorReturn(new Func1<Throwable, ResponseOne>() {
        @Override
        public ResponseOne call(final Throwable throwable) {
            return new ResponseOne();
        }
    }),
   responseTwoObservable
       .onErrorReturn(new Func1<Throwable, ResponseTwo>() {
        @Override
        public ResponseTwo call(final Throwable throwable) {
            return new ResponseTwo();
        }
    }),
   ...

有关详细信息,请参阅 onError handling


更新: 在 RxJava 2.0 中你必须使用 Function 而不是 Func1:

import io.reactivex.functions.Function;
...
Observable.zip(
   responseOneObservable
       .onErrorReturn(new Function<Throwable, ResponseOne>() {
        @Override
        public ResponseOne apply(@NonNull final Throwable throwable) {
            return new ResponseOne();
        }
    }),
   responseTwoObservable
       .onErrorReturn(new Function<Throwable, ResponseTwo>() {
        @Override
        public ResponseTwo apply(@NonNull final Throwable throwable) {
            return new ResponseTwo();
        }
    }),
   ...

或使用 lambdas:

Observable.zip(
   responseOneObservable
       .onErrorReturn(throwable -> new ResponseOne()),
   responseTwoObservable
       .onErrorReturn(throwable -> new ResponseTwo()),
   ...

或使用 Kotlin:

Observable.zip(
   responseOneObservable
       .onErrorReturn { ResponseOne() },
   responseTwoObservable
       .onErrorReturn { ResponseTwo() },
   ...