RxJava 的 doOnError 和 onErrorReturn 是如何工作的?

How does RxJava doOnError and onErrorReturn work?

我做了这些单元测试,结果完全不是我所期望的:

// This one outputs "subscribe.onError" 
@Test
public void observable_doOnError_subscribingToError() throws InterruptedException {
    Observable<String> obs = getErrorProducingObservable();
    obs.doOnError(throwable -> System.out.println("doOnError"));
    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> {},
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

// This one outputs "subscribe.onError" 
@Test
public void observable_onErrorReturn() throws InterruptedException {
    Observable<String> obs = getErrorProducingObservable();
    obs.onErrorReturn(throwable -> "Yeah I got this");
    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> System.out.println("got: " + s),
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

private Observable<String> getErrorProducingObservable()  {
    return Observable.create(subscriber -> {
        subscriber.onError(new RuntimeException("Somebody set up us the bomb"));
    });
}

所以两个输出 "subscribe.onError" - doOnErroronErrorReturn 似乎都没有被调用。

doOnError记录为:

Modifies the source Observable so that it invokes an action if it calls onError.

我不确定如何解释,但我希望输出 "doOnError" 或 "doOnError" 后跟 "subscribe.onError"。

onErrorReturn 记录为:

Instructs an Observable to emit an item (returned by a specified function) rather than invoking onError if it encounters an error.

因此我期待 "got: Yeah I got this" 作为后一个测试的输出。

什么给了?

doOnErroronErrorReturn returns 一个新的 Observable 改变了行为。我同意它们的文档可能有点误导。像这样修改您的测试以获得预期的行为:

// This one outputs "subscribe.onError" 
@Test
public void observable_doOnError_subscribingToError() throws InterruptedException {
    Observable<String> obs = 
        getErrorProducingObservable()
            .doOnError(throwable -> System.out.println("doOnError"));

    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> {},
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

// This one outputs "subscribe.onError" 
@Test
public void observable_onErrorReturn() throws InterruptedException {
    Observable<String> obs = 
        getErrorProducingObservable()
            .onErrorReturn(throwable -> "Yeah I got this");

    obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(
        s -> System.out.println("got: " + s),
        error -> System.out.println("subscribe.onError")
    );
    Thread.sleep(300);
}

private Observable<String> getErrorProducingObservable()  {
    return Observable.create(subscriber -> {
        subscriber.onError(new RuntimeException("Somebody set up us the bomb"));
    });
}