RxJava + 改造 + 轮询

RxJava + Retrofit + polling

我有一个 Retrofit 调用,想每 30 秒调用一次。为此,我使用 Observable.interval(0, 30, TimeUnit.SECONDS)

Observable
    .interval(0, 30, TimeUnit.SECONDS)
    .flatMap(x -> RestApi.instance().getUsers())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(list -> {
                    // ...
               },
               error -> Timber.e(error, "can't load users"));

我的问题:如果 api 调用失败,将调用 onError 并且订阅取消订阅并且轮询不再工作:-(

为了捕获 api 错误,我添加了一个 retryWhen

Observable
    .interval(0, 30, TimeUnit.SECONDS)
    .flatMap(x -> RestApi.instance().getUsers()
                         .retryWhen(errors -> errors
                             .flatMap(error -> Observable.timer(15, TimeUnit.SECONDS))))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(list -> {
                   // ...
               },
               error -> Timber.e(error, "can't load users"));

这会捕获错误,但我在一段时间内收到多个 api 调用。每 30 秒我都会收到一个新的轮询信号,该信号以新的 api 请求结束。但是,如果 api 请求失败,它会自行重试。所以我有一个新请求加上所有重试。

我的问题:如何在不取消订阅轮询信号的情况下处理 api 错误?

您可以使用 onErrorResumeNext 或 onExceptionResumeNext 并向其传递“错误”值。 您可以根据需要寻找其他错误处理方法here

如果您希望您的 getUsers 请求在请求失败时不以 onError 结束,而不是 returning Observable<yourUserType> getUsers(),请将其设为 return一个Observable<Response<yourUserType>> getUsers()。这样您就可以拦截 Response 对象中的网络错误。

此方法仅在您使用改装时有效 2.x

阅读如何正确使用 retryWhenrepeatWhenhttp://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/

以及如何使用 onError 运算符: http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/

使用 Rx 真的很简单 :) 我不会给你一个最终的解决方案,只是尝试一下并尝试理解这里的流程。

您可以使用这段代码,在这段代码中实现请求之间的尝试次数和时间延迟

private static int COUNTER_START = 1;
private static final int ATTEMPTS = 6;
private static final int ORIGINAL_DELAY_IN_SECONDS = 2;

remoteData.getAllRides(idSearch)
            .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
                        @Override
                        public Observable<?> call(Observable<? extends Void> observable) {
                            return observable.flatMap(new Func1<Void, Observable<?>>() {
                                @Override
                                public Observable<?> call(Void aVoid) {
                                    if(COUNTER_START > ATTEMPTS){
                                        throw new RuntimeException();
                                    }
                                    COUNTER_START++;
                                    return Observable.timer(ORIGINAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
                                }
                            });
                        }
                    })
            .takeUntil(new Func1<RideResponse, Boolean>() {
                @Override
                public Boolean call(RideResponse rideResponse) {
                    return rideResponse.getState().equals("finished");//this is the validation finish polling

                }
            }).filter(new Func1<RideResponse, Boolean>() {
                @Override
                public Boolean call(RideResponse rideResponse) {
                    return rideResponse.getState().equals("finished"); //this is the validation finish polling
                }
            }).map(rideResponse -> Log.e("",rideResponse.toString()))
        .doOnError(err -> Log.e("Polling", "Error retrieving messages: " + err));