在 RxJava 中使用 onErrorReturn 和 retryWhen
Use onErrorReturn with retryWhen in RxJava
代码如下:
import io.reactivex.Observable;
import io.reactivex.Observer;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
public class RxJavaTest {
@Test
public void onErr() {
Observable<String> values1 = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("New");
observer.onNext("New1");
observer.onNext("New2");
observer.onNext("New3");
observer.onNext("New4");
if (ThreadLocalRandom
.current()
.nextInt(10) == 5) {
observer.onError(new Exception("don't retry..."));
} else {
observer.onError(new IllegalArgumentException("retry..."));
}
}
};
AtomicBoolean finished = new AtomicBoolean(false);
values1
.retryWhen(throwableObservable -> throwableObservable
.takeWhile(throwable -> {
boolean result = (throwable instanceof IllegalArgumentException);
if (result) {
System.out.println("Retry on error: " + throwable);
return result;
}
System.out.println("Error: " + throwable);
return result;
})
.take(20))
.onErrorReturn(throwable -> "Saved the day!")
.doOnTerminate(() -> finished.set(true))
.subscribe(v -> System.out.println(v));
}
}
目标是
- 仅当存在
IllegalArgumentException
、 时重试
- 对于任何其他异常,立即return(通过
onErrorReturn
)。
上面的代码完成了第一个目标,但在第二个目标上失败了,它停止重试,但忽略了 .onErrorReturn
部分。
知道如何让它发挥作用吗?
您可以将 retryWhen
更改为:
.retryWhen(throwableObservable ->
throwableObservable.flatMap(throwable -> {
if (throwable instanceof IllegalArgumentException) {
System.out.println("Retry on error: " + throwable);
return Observable.just(1);
} else {
System.out.println("Error: " + throwable);
return Observable.<Integer>error(throwable);
}
})
)
为了重试,retryWhen
中 return 的值并不重要(在上面的示例中是 returning 1)。根据 javadoc:
If that ObservableSource calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.
郑重声明,这是我在使用 onErrorResumeNext
:
看到 Gustavo 的回答之前的解决方案
private Observable<String> createObservable(long delay) {
Observable<String> values1 = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("New");
observer.onNext("New1");
observer.onNext("New2");
observer.onNext("New3");
observer.onNext("New4");
if (ThreadLocalRandom
.current()
.nextInt(8) == 2) {
observer.onError(new RuntimeException("don't retry..."));
} else {
observer.onError(new IllegalArgumentException("retry..."));
}
}
};
return Observable.timer(delay, TimeUnit.SECONDS).flatMap(aLong -> values1)
.onErrorResumeNext((Throwable throwable) -> {
if (throwable instanceof IllegalArgumentException) {
return createObservable(delay + 2);
} else {
return Observable.just("The default value");
}
});
}
这按预期工作,但我认为古斯塔沃建议的方式更容易理解。这是使用 retryWhen
:
重写的相同函数
private Observable<String> createObservable1() {
Observable<String> values1 = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("New");
observer.onNext("New1");
observer.onNext("New2");
observer.onNext("New3");
observer.onNext("New4");
if (ThreadLocalRandom
.current()
.nextInt(3) == 1) {
observer.onError(new RuntimeException("don't retry..."));
} else {
observer.onError(new IllegalArgumentException("retry..."));
}
}
};
return values1.retryWhen(throwableObservable ->
throwableObservable
.zipWith(Observable.range(1, 5), (throwable, integer) -> {
if (throwable instanceof IllegalArgumentException) {
System.out.println("Retry on error: " + throwable);
return integer;
}
System.out.println("No retry on error: " + throwable);
return -1;
})
.flatMap(integer -> {
if (integer > 0) {
System.out.println("Delay " + integer + " sec on retry...");
return Observable.timer(integer, TimeUnit.SECONDS);
}
System.out.println("Return immediately...");
return Observable.error(new Exception());
})
).onErrorReturnItem("Saved the day!");
}
希望这对您有所帮助。
代码如下:
import io.reactivex.Observable;
import io.reactivex.Observer;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
public class RxJavaTest {
@Test
public void onErr() {
Observable<String> values1 = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("New");
observer.onNext("New1");
observer.onNext("New2");
observer.onNext("New3");
observer.onNext("New4");
if (ThreadLocalRandom
.current()
.nextInt(10) == 5) {
observer.onError(new Exception("don't retry..."));
} else {
observer.onError(new IllegalArgumentException("retry..."));
}
}
};
AtomicBoolean finished = new AtomicBoolean(false);
values1
.retryWhen(throwableObservable -> throwableObservable
.takeWhile(throwable -> {
boolean result = (throwable instanceof IllegalArgumentException);
if (result) {
System.out.println("Retry on error: " + throwable);
return result;
}
System.out.println("Error: " + throwable);
return result;
})
.take(20))
.onErrorReturn(throwable -> "Saved the day!")
.doOnTerminate(() -> finished.set(true))
.subscribe(v -> System.out.println(v));
}
}
目标是
- 仅当存在
IllegalArgumentException
、 时重试
- 对于任何其他异常,立即return(通过
onErrorReturn
)。
上面的代码完成了第一个目标,但在第二个目标上失败了,它停止重试,但忽略了 .onErrorReturn
部分。
知道如何让它发挥作用吗?
您可以将 retryWhen
更改为:
.retryWhen(throwableObservable ->
throwableObservable.flatMap(throwable -> {
if (throwable instanceof IllegalArgumentException) {
System.out.println("Retry on error: " + throwable);
return Observable.just(1);
} else {
System.out.println("Error: " + throwable);
return Observable.<Integer>error(throwable);
}
})
)
为了重试,retryWhen
中 return 的值并不重要(在上面的示例中是 returning 1)。根据 javadoc:
If that ObservableSource calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.
郑重声明,这是我在使用 onErrorResumeNext
:
private Observable<String> createObservable(long delay) {
Observable<String> values1 = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("New");
observer.onNext("New1");
observer.onNext("New2");
observer.onNext("New3");
observer.onNext("New4");
if (ThreadLocalRandom
.current()
.nextInt(8) == 2) {
observer.onError(new RuntimeException("don't retry..."));
} else {
observer.onError(new IllegalArgumentException("retry..."));
}
}
};
return Observable.timer(delay, TimeUnit.SECONDS).flatMap(aLong -> values1)
.onErrorResumeNext((Throwable throwable) -> {
if (throwable instanceof IllegalArgumentException) {
return createObservable(delay + 2);
} else {
return Observable.just("The default value");
}
});
}
这按预期工作,但我认为古斯塔沃建议的方式更容易理解。这是使用 retryWhen
:
private Observable<String> createObservable1() {
Observable<String> values1 = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("New");
observer.onNext("New1");
observer.onNext("New2");
observer.onNext("New3");
observer.onNext("New4");
if (ThreadLocalRandom
.current()
.nextInt(3) == 1) {
observer.onError(new RuntimeException("don't retry..."));
} else {
observer.onError(new IllegalArgumentException("retry..."));
}
}
};
return values1.retryWhen(throwableObservable ->
throwableObservable
.zipWith(Observable.range(1, 5), (throwable, integer) -> {
if (throwable instanceof IllegalArgumentException) {
System.out.println("Retry on error: " + throwable);
return integer;
}
System.out.println("No retry on error: " + throwable);
return -1;
})
.flatMap(integer -> {
if (integer > 0) {
System.out.println("Delay " + integer + " sec on retry...");
return Observable.timer(integer, TimeUnit.SECONDS);
}
System.out.println("Return immediately...");
return Observable.error(new Exception());
})
).onErrorReturnItem("Saved the day!");
}
希望这对您有所帮助。