重试/延迟重复在 RxJava2 中不起作用
retry / repeat with delay not working in RxJava2
我正在升级到 rxjava2,我们有从服务器轮询数据的代码,当出现网络问题时,代码处理延迟重试。但是,不知何故,当我尝试迁移到 rxjava2 时,代码停止工作。
这里是Rxjava1的代码,完美运行,基本上就是这样http://blog.danlew.net/2015/03/02/dont-break-the-chain/ and this https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
// wrap a flatmap here so that i can check the exception type
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
mThrowable = throwable;
if (throwable instanceof IOException) {
return observable.compose(timerWithRetries());
} else {
// for other errors, call onError and exit
return Observable.error(throwable);
}
}
});
}
})
private <T> Observable.Transformer<T, Long> timerWithRetries() {
return new Observable.Transformer<T, Long>() {
@Override
public Observable<Long> call(Observable<T> observable) {
return observable
.zipWith(Observable.range(COUNTER_START, MAX_RETRIES + 1),
new Func2<T, Integer, Integer>() {
@Override
public Integer call(T t, Integer repeatAttempt) {
return repeatAttempt;
}
})
.flatMap(new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer repeatAttempt) {
if (repeatAttempt == MAX_RETRIES + 1) {
if (mThrowable instanceof IOException) {
// Custom Exception
throw new Exception();
}
}
// increase the waiting time
return Observable.timer(repeatAttempt * mDelaySeconds, TimeUnit.SECONDS);
}
});
}
};
}
我想用 flatmap 包装错误,以便我可以检查异常类型,当它达到最大重试次数时,我可以将我的自定义异常传递给 onError。
但是,当只使用Rxjava2时,timerWithRetries()方法停止工作,调用了该方法,但没有执行.zipWith()及其平面图。
但它在包装错误时没有平面图工作,这很奇怪。像
.retryWhen(error -> error.compose(timerWithRetries()))
非常感谢任何建议!
1.x retryWhen
使用 BehaviorSubject
保留最后一个 Throwable 并在有新订阅者时重播。这主要是由于它的 "weird" 实现试图支持大多数 retry
和 repeat
运算符。
2.x 使用 PublishSubject
并且通常只订阅一次(不会重新组合)。只有发生故障时的观察者才会收到错误值,但不会收到错误发出后立即出现的任何内容。
实际上,observable.compose(timerWithRetries());
并不完全正确,因为您一直在向主题添加观察者而没有清除之前的观察者。
最后一个案例之所以可行,是因为您在主要错误源上构建了一个 counted-flatMapped 处理程序,该处理程序作为对原始错误的响应发出。
我终于找到了解决方法。使用 delay() 而不是使用 zipWith() 和 flatmap()。
AtomicInteger retryCounter = new AtomicInteger(0);
.retryWhen(error -> error.flatmap(e -> {
if (e instanceof HttpException) {
// code that deals with specific exception
int retries = retryCounter.increaseAndGet();
if (retries < MAX_RETRIES) {
// Key point here, uses .delay()
return Observable.just(new Object()).delay(delaySeconds, SECOND);
}
}
}))
我正在升级到 rxjava2,我们有从服务器轮询数据的代码,当出现网络问题时,代码处理延迟重试。但是,不知何故,当我尝试迁移到 rxjava2 时,代码停止工作。 这里是Rxjava1的代码,完美运行,基本上就是这样http://blog.danlew.net/2015/03/02/dont-break-the-chain/ and this https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
// wrap a flatmap here so that i can check the exception type
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
mThrowable = throwable;
if (throwable instanceof IOException) {
return observable.compose(timerWithRetries());
} else {
// for other errors, call onError and exit
return Observable.error(throwable);
}
}
});
}
})
private <T> Observable.Transformer<T, Long> timerWithRetries() {
return new Observable.Transformer<T, Long>() {
@Override
public Observable<Long> call(Observable<T> observable) {
return observable
.zipWith(Observable.range(COUNTER_START, MAX_RETRIES + 1),
new Func2<T, Integer, Integer>() {
@Override
public Integer call(T t, Integer repeatAttempt) {
return repeatAttempt;
}
})
.flatMap(new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer repeatAttempt) {
if (repeatAttempt == MAX_RETRIES + 1) {
if (mThrowable instanceof IOException) {
// Custom Exception
throw new Exception();
}
}
// increase the waiting time
return Observable.timer(repeatAttempt * mDelaySeconds, TimeUnit.SECONDS);
}
});
}
};
}
我想用 flatmap 包装错误,以便我可以检查异常类型,当它达到最大重试次数时,我可以将我的自定义异常传递给 onError。
但是,当只使用Rxjava2时,timerWithRetries()方法停止工作,调用了该方法,但没有执行.zipWith()及其平面图。
但它在包装错误时没有平面图工作,这很奇怪。像
.retryWhen(error -> error.compose(timerWithRetries()))
非常感谢任何建议!
1.x retryWhen
使用 BehaviorSubject
保留最后一个 Throwable 并在有新订阅者时重播。这主要是由于它的 "weird" 实现试图支持大多数 retry
和 repeat
运算符。
2.x 使用 PublishSubject
并且通常只订阅一次(不会重新组合)。只有发生故障时的观察者才会收到错误值,但不会收到错误发出后立即出现的任何内容。
实际上,observable.compose(timerWithRetries());
并不完全正确,因为您一直在向主题添加观察者而没有清除之前的观察者。
最后一个案例之所以可行,是因为您在主要错误源上构建了一个 counted-flatMapped 处理程序,该处理程序作为对原始错误的响应发出。
我终于找到了解决方法。使用 delay() 而不是使用 zipWith() 和 flatmap()。
AtomicInteger retryCounter = new AtomicInteger(0);
.retryWhen(error -> error.flatmap(e -> {
if (e instanceof HttpException) {
// code that deals with specific exception
int retries = retryCounter.increaseAndGet();
if (retries < MAX_RETRIES) {
// Key point here, uses .delay()
return Observable.just(new Object()).delay(delaySeconds, SECOND);
}
}
}))