在 RxJava 中使用 onErrorReturn 和 retryWhen

Use onErrorReturn with retryWhen in RxJava

代码如下:

import io.reactivex.Observable;
import io.reactivex.Observer;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

public class RxJavaTest {

    @Test
    public void onErr() {

        Observable<String> values1 = new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext("New");
                observer.onNext("New1");
                observer.onNext("New2");
                observer.onNext("New3");
                observer.onNext("New4");
                if (ThreadLocalRandom
                            .current()
                            .nextInt(10) == 5) {
                    observer.onError(new Exception("don't retry..."));
                } else {
                    observer.onError(new IllegalArgumentException("retry..."));
                }
            }
        };
        AtomicBoolean finished = new AtomicBoolean(false);
        values1
                .retryWhen(throwableObservable -> throwableObservable
                        .takeWhile(throwable -> {
                            boolean result = (throwable instanceof IllegalArgumentException);
                            if (result) {
                                System.out.println("Retry on error: " + throwable);
                                return result;
                            }
                            System.out.println("Error: " + throwable);
                            return result;
                        })
                        .take(20))
                .onErrorReturn(throwable -> "Saved the day!")
                .doOnTerminate(() -> finished.set(true))
                .subscribe(v -> System.out.println(v));
    }
}

目标是

上面的代码完成了第一个目标,但在第二个目标上失败了,它停止重试,但忽略了 .onErrorReturn 部分。

知道如何让它发挥作用吗?

您可以将 retryWhen 更改为:

                .retryWhen(throwableObservable ->
                                throwableObservable.flatMap(throwable -> {
                                    if (throwable instanceof IllegalArgumentException) {
                                        System.out.println("Retry on error: " + throwable);
                                        return Observable.just(1);
                                    } else {
                                        System.out.println("Error: " + throwable);
                                        return Observable.<Integer>error(throwable);
                                    }
                                })
                )

为了重试,retryWhen 中 return 的值并不重要(在上面的示例中是 returning 1)。根据 javadoc:

If that ObservableSource calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.

郑重声明,这是我在使用 onErrorResumeNext:

看到 Gustavo 的回答之前的解决方案
    private Observable<String> createObservable(long delay) {
        Observable<String> values1 = new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext("New");
                observer.onNext("New1");
                observer.onNext("New2");
                observer.onNext("New3");
                observer.onNext("New4");
                if (ThreadLocalRandom
                        .current()
                        .nextInt(8) == 2) {
                    observer.onError(new RuntimeException("don't retry..."));
                } else {
                    observer.onError(new IllegalArgumentException("retry..."));
                }
            }
        };
        return Observable.timer(delay, TimeUnit.SECONDS).flatMap(aLong -> values1)
                .onErrorResumeNext((Throwable throwable) -> {
                    if (throwable instanceof IllegalArgumentException) {
                        return createObservable(delay + 2);
                    } else {
                        return Observable.just("The default value");
                    }
                });
    }

这按预期工作,但我认为古斯塔沃建议的方式更容易理解。这是使用 retryWhen:

重写的相同函数
    private Observable<String> createObservable1() {
        Observable<String> values1 = new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext("New");
                observer.onNext("New1");
                observer.onNext("New2");
                observer.onNext("New3");
                observer.onNext("New4");
                if (ThreadLocalRandom
                        .current()
                        .nextInt(3) == 1) {
                    observer.onError(new RuntimeException("don't retry..."));
                } else {
                    observer.onError(new IllegalArgumentException("retry..."));
                }
            }
        };
        return values1.retryWhen(throwableObservable ->
                throwableObservable
                        .zipWith(Observable.range(1, 5), (throwable, integer) -> {
                            if (throwable instanceof IllegalArgumentException) {
                                System.out.println("Retry on error: " + throwable);
                                return integer;
                            }
                            System.out.println("No retry on error: " + throwable);
                            return -1;
                        })
                        .flatMap(integer -> {
                            if (integer > 0) {
                                System.out.println("Delay " + integer + " sec on retry...");
                                return Observable.timer(integer, TimeUnit.SECONDS);
                            }
                            System.out.println("Return immediately...");
                            return Observable.error(new Exception());
                        })
        ).onErrorReturnItem("Saved the day!");
    }

希望这对您有所帮助。