RxJava + 改造 + 轮询
RxJava + Retrofit + polling
我有一个 Retrofit 调用,想每 30 秒调用一次。为此,我使用 Observable.interval(0, 30, TimeUnit.SECONDS)
Observable
.interval(0, 30, TimeUnit.SECONDS)
.flatMap(x -> RestApi.instance().getUsers())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(list -> {
// ...
},
error -> Timber.e(error, "can't load users"));
我的问题:如果 api 调用失败,将调用 onError
并且订阅取消订阅并且轮询不再工作:-(
为了捕获 api 错误,我添加了一个 retryWhen
Observable
.interval(0, 30, TimeUnit.SECONDS)
.flatMap(x -> RestApi.instance().getUsers()
.retryWhen(errors -> errors
.flatMap(error -> Observable.timer(15, TimeUnit.SECONDS))))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(list -> {
// ...
},
error -> Timber.e(error, "can't load users"));
这会捕获错误,但我在一段时间内收到多个 api 调用。每 30 秒我都会收到一个新的轮询信号,该信号以新的 api 请求结束。但是,如果 api 请求失败,它会自行重试。所以我有一个新请求加上所有重试。
我的问题:如何在不取消订阅轮询信号的情况下处理 api 错误?
您可以使用 onErrorResumeNext 或 onExceptionResumeNext 并向其传递“错误”值。
您可以根据需要寻找其他错误处理方法here
如果您希望您的 getUsers
请求在请求失败时不以 onError
结束,而不是 returning Observable<yourUserType> getUsers()
,请将其设为 return一个Observable<Response<yourUserType>> getUsers()
。这样您就可以拦截 Response
对象中的网络错误。
此方法仅在您使用改装时有效 2.x
阅读如何正确使用 retryWhen
和 repeatWhen
。
http://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/
以及如何使用 onError
运算符:
http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/
使用 Rx 真的很简单 :) 我不会给你一个最终的解决方案,只是尝试一下并尝试理解这里的流程。
您可以使用这段代码,在这段代码中实现请求之间的尝试次数和时间延迟
private static int COUNTER_START = 1;
private static final int ATTEMPTS = 6;
private static final int ORIGINAL_DELAY_IN_SECONDS = 2;
remoteData.getAllRides(idSearch)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return observable.flatMap(new Func1<Void, Observable<?>>() {
@Override
public Observable<?> call(Void aVoid) {
if(COUNTER_START > ATTEMPTS){
throw new RuntimeException();
}
COUNTER_START++;
return Observable.timer(ORIGINAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
}
});
}
})
.takeUntil(new Func1<RideResponse, Boolean>() {
@Override
public Boolean call(RideResponse rideResponse) {
return rideResponse.getState().equals("finished");//this is the validation finish polling
}
}).filter(new Func1<RideResponse, Boolean>() {
@Override
public Boolean call(RideResponse rideResponse) {
return rideResponse.getState().equals("finished"); //this is the validation finish polling
}
}).map(rideResponse -> Log.e("",rideResponse.toString()))
.doOnError(err -> Log.e("Polling", "Error retrieving messages: " + err));
我有一个 Retrofit 调用,想每 30 秒调用一次。为此,我使用 Observable.interval(0, 30, TimeUnit.SECONDS)
Observable
.interval(0, 30, TimeUnit.SECONDS)
.flatMap(x -> RestApi.instance().getUsers())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(list -> {
// ...
},
error -> Timber.e(error, "can't load users"));
我的问题:如果 api 调用失败,将调用 onError
并且订阅取消订阅并且轮询不再工作:-(
为了捕获 api 错误,我添加了一个 retryWhen
Observable
.interval(0, 30, TimeUnit.SECONDS)
.flatMap(x -> RestApi.instance().getUsers()
.retryWhen(errors -> errors
.flatMap(error -> Observable.timer(15, TimeUnit.SECONDS))))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(list -> {
// ...
},
error -> Timber.e(error, "can't load users"));
这会捕获错误,但我在一段时间内收到多个 api 调用。每 30 秒我都会收到一个新的轮询信号,该信号以新的 api 请求结束。但是,如果 api 请求失败,它会自行重试。所以我有一个新请求加上所有重试。
我的问题:如何在不取消订阅轮询信号的情况下处理 api 错误?
您可以使用 onErrorResumeNext 或 onExceptionResumeNext 并向其传递“错误”值。 您可以根据需要寻找其他错误处理方法here
如果您希望您的 getUsers
请求在请求失败时不以 onError
结束,而不是 returning Observable<yourUserType> getUsers()
,请将其设为 return一个Observable<Response<yourUserType>> getUsers()
。这样您就可以拦截 Response
对象中的网络错误。
此方法仅在您使用改装时有效 2.x
阅读如何正确使用 retryWhen
和 repeatWhen
。
http://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/
以及如何使用 onError
运算符:
http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/
使用 Rx 真的很简单 :) 我不会给你一个最终的解决方案,只是尝试一下并尝试理解这里的流程。
您可以使用这段代码,在这段代码中实现请求之间的尝试次数和时间延迟
private static int COUNTER_START = 1;
private static final int ATTEMPTS = 6;
private static final int ORIGINAL_DELAY_IN_SECONDS = 2;
remoteData.getAllRides(idSearch)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return observable.flatMap(new Func1<Void, Observable<?>>() {
@Override
public Observable<?> call(Void aVoid) {
if(COUNTER_START > ATTEMPTS){
throw new RuntimeException();
}
COUNTER_START++;
return Observable.timer(ORIGINAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
}
});
}
})
.takeUntil(new Func1<RideResponse, Boolean>() {
@Override
public Boolean call(RideResponse rideResponse) {
return rideResponse.getState().equals("finished");//this is the validation finish polling
}
}).filter(new Func1<RideResponse, Boolean>() {
@Override
public Boolean call(RideResponse rideResponse) {
return rideResponse.getState().equals("finished"); //this is the validation finish polling
}
}).map(rideResponse -> Log.e("",rideResponse.toString()))
.doOnError(err -> Log.e("Polling", "Error retrieving messages: " + err));