RxJava:如何处理 zip 运算符的错误?
RxJava : How to handle error with zip operator ?
我正在使用带有 Retrofit2 的 RxJava 和 RxAndroid。
Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
在上面两个Observer 上使用如下zip 运算符。
Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() {
@Override
public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) {
ArrayList<TestData> testDataList = new ArrayList();
// Add test data from response responseOne & responseTwo
return testDataList;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<ArrayList<TestData>>() {
@Override
public void onNext(ArrayList<TestData> testDataList) {
}
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted" );
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError Throwable: " + t.toString() );
}
});
如果在responseOneObservable
和responseTwoObservable
中retrofit http调用有错误,则直接调用testDataObservable
订阅者的onError
方法。[=18] =]
我想继续使用 zip 运算符的 call
方法,即使两个可观察对象中的任何一个都获得了成功响应。
如何使用 zip
运算符处理错误响应?
您应该在单个压缩的可观察对象上使用 onErrorResumeNext
来指示它们在出现错误时发出默认项。
您可以使用 onErrorResumeNext
运算符 return 来自两个可观察对象之一的默认响应。
Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.onErrorResumeNext(throwable -> {/*some default value*/})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.onErrorResumeNext(throwable -> {/*some default value*/})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
你可以使用 onErrorResumeNext
到 return 一些 Observable 或者 onErrorReturn
到 return 一些默认值到 zip
,比如:
Observable.zip(
responseOneObservable
.onErrorReturn(new Func1<Throwable, ResponseOne>() {
@Override
public ResponseOne call(final Throwable throwable) {
return new ResponseOne();
}
}),
responseTwoObservable
.onErrorReturn(new Func1<Throwable, ResponseTwo>() {
@Override
public ResponseTwo call(final Throwable throwable) {
return new ResponseTwo();
}
}),
...
有关详细信息,请参阅 onError handling。
更新: 在 RxJava 2.0 中你必须使用 Function
而不是 Func1
:
import io.reactivex.functions.Function;
...
Observable.zip(
responseOneObservable
.onErrorReturn(new Function<Throwable, ResponseOne>() {
@Override
public ResponseOne apply(@NonNull final Throwable throwable) {
return new ResponseOne();
}
}),
responseTwoObservable
.onErrorReturn(new Function<Throwable, ResponseTwo>() {
@Override
public ResponseTwo apply(@NonNull final Throwable throwable) {
return new ResponseTwo();
}
}),
...
或使用 lambdas:
Observable.zip(
responseOneObservable
.onErrorReturn(throwable -> new ResponseOne()),
responseTwoObservable
.onErrorReturn(throwable -> new ResponseTwo()),
...
或使用 Kotlin:
Observable.zip(
responseOneObservable
.onErrorReturn { ResponseOne() },
responseTwoObservable
.onErrorReturn { ResponseTwo() },
...
我正在使用带有 Retrofit2 的 RxJava 和 RxAndroid。
Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
在上面两个Observer 上使用如下zip 运算符。
Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() {
@Override
public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) {
ArrayList<TestData> testDataList = new ArrayList();
// Add test data from response responseOne & responseTwo
return testDataList;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<ArrayList<TestData>>() {
@Override
public void onNext(ArrayList<TestData> testDataList) {
}
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted" );
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError Throwable: " + t.toString() );
}
});
如果在responseOneObservable
和responseTwoObservable
中retrofit http调用有错误,则直接调用testDataObservable
订阅者的onError
方法。[=18] =]
我想继续使用 zip 运算符的 call
方法,即使两个可观察对象中的任何一个都获得了成功响应。
如何使用 zip
运算符处理错误响应?
您应该在单个压缩的可观察对象上使用 onErrorResumeNext
来指示它们在出现错误时发出默认项。
您可以使用 onErrorResumeNext
运算符 return 来自两个可观察对象之一的默认响应。
Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.onErrorResumeNext(throwable -> {/*some default value*/})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.onErrorResumeNext(throwable -> {/*some default value*/})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
你可以使用 onErrorResumeNext
到 return 一些 Observable 或者 onErrorReturn
到 return 一些默认值到 zip
,比如:
Observable.zip(
responseOneObservable
.onErrorReturn(new Func1<Throwable, ResponseOne>() {
@Override
public ResponseOne call(final Throwable throwable) {
return new ResponseOne();
}
}),
responseTwoObservable
.onErrorReturn(new Func1<Throwable, ResponseTwo>() {
@Override
public ResponseTwo call(final Throwable throwable) {
return new ResponseTwo();
}
}),
...
有关详细信息,请参阅 onError handling。
更新: 在 RxJava 2.0 中你必须使用 Function
而不是 Func1
:
import io.reactivex.functions.Function;
...
Observable.zip(
responseOneObservable
.onErrorReturn(new Function<Throwable, ResponseOne>() {
@Override
public ResponseOne apply(@NonNull final Throwable throwable) {
return new ResponseOne();
}
}),
responseTwoObservable
.onErrorReturn(new Function<Throwable, ResponseTwo>() {
@Override
public ResponseTwo apply(@NonNull final Throwable throwable) {
return new ResponseTwo();
}
}),
...
或使用 lambdas:
Observable.zip(
responseOneObservable
.onErrorReturn(throwable -> new ResponseOne()),
responseTwoObservable
.onErrorReturn(throwable -> new ResponseTwo()),
...
或使用 Kotlin:
Observable.zip(
responseOneObservable
.onErrorReturn { ResponseOne() },
responseTwoObservable
.onErrorReturn { ResponseTwo() },
...