重试/延迟重复在 RxJava2 中不起作用

retry / repeat with delay not working in RxJava2

我正在升级到 rxjava2,我们有从服务器轮询数据的代码,当出现网络问题时,代码处理延迟重试。但是,不知何故,当我尝试迁移到 rxjava2 时,代码停止工作。 这里是Rxjava1的代码,完美运行,基本上就是这样http://blog.danlew.net/2015/03/02/dont-break-the-chain/ and this https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(final Observable<? extends Throwable> observable) {
                    // wrap a flatmap here so that i can check the exception type
                    return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                        @Override
                        public Observable<?> call(Throwable throwable) {
                            mThrowable = throwable;
                            if (throwable instanceof IOException) {
                                return observable.compose(timerWithRetries());
                            } else {
                                // for other errors, call onError and exit
                                return Observable.error(throwable);
                            }
                        }
                    });
                }
            })



private <T> Observable.Transformer<T, Long> timerWithRetries() {
    return new Observable.Transformer<T, Long>() {

        @Override
        public Observable<Long> call(Observable<T> observable) {
            return observable
                    .zipWith(Observable.range(COUNTER_START, MAX_RETRIES + 1),
                            new Func2<T, Integer, Integer>() {
                                @Override
                                public Integer call(T t, Integer repeatAttempt) {
                                    return repeatAttempt;
                                }
                            })
                    .flatMap(new Func1<Integer, Observable<Long>>() {
                        @Override
                        public Observable<Long> call(Integer repeatAttempt) {
                            if (repeatAttempt == MAX_RETRIES + 1) {
                                if (mThrowable instanceof IOException) {
                                  // Custom Exception
                                   throw new Exception();
                                }
                            }
                            // increase the waiting time
                            return Observable.timer(repeatAttempt * mDelaySeconds, TimeUnit.SECONDS);
                        }
                    });
        }
    };
}

我想用 flatmap 包装错误,以便我可以检查异常类型,当它达到最大重试次数时,我可以将我的自定义异常传递给 onError。

但是,当只使用Rxjava2时,timerWithRetries()方法停止工作,调用了该方法,但没有执行.zipWith()及其平面图。

但它在包装错误时没有平面图工作,这很奇怪。像

.retryWhen(error -> error.compose(timerWithRetries()))

非常感谢任何建议!

1.x retryWhen 使用 BehaviorSubject 保留最后一个 Throwable 并在有新订阅者时重播。这主要是由于它的 "weird" 实现试图支持大多数 retryrepeat 运算符。

2.x 使用 PublishSubject 并且通常只订阅一次(不会重新组合)。只有发生故障时的观察者才会收到错误值,但不会收到错误发出后立即出现的任何内容。

实际上,observable.compose(timerWithRetries()); 并不完全正确,因为您一直在向主题添加观察者而没有清除之前的观察者。

最后一个案例之所以可行,是因为您在主要错误源上构建了一个 counted-flatMapped 处理程序,该处理程序作为对原始错误的响应发出。

我终于找到了解决方法。使用 delay() 而不是使用 zipWith() 和 flatmap()。

 AtomicInteger retryCounter = new AtomicInteger(0);

.retryWhen(error -> error.flatmap(e -> {
     if (e instanceof HttpException) {
         // code that deals with specific exception
        int retries = retryCounter.increaseAndGet();
        if (retries < MAX_RETRIES) {
            // Key point here, uses .delay()
            return Observable.just(new Object()).delay(delaySeconds, SECOND);
        }
     }
 }))