RxJava2:即使第一个 Completable 发出错误,.andThen() 也会被调用

RxJava2: .andThen() called even though first Completable emitted error

我在一个方法中有 2 个可完成项,例如:

completable1.doSomething()
    .andThen(completable2.doSomethingElse())

我想通过使用 Mockito 并在第一个可完成项中返回错误来测试它。这是我原来的方法:

 public Completable updateQuestion(Question question){
    return gameRepositoryType.updateQuestion(question)
            .andThen(gameRepositoryType.getGameById(question.getqId())
                    .map(GAME_MAPPER)
                    .flatMapCompletable(gameRepositoryType::updateGame))
            .observeOn(AndroidSchedulers.mainThread());
}

这是我的测试:

@Test
public void updateQuestionError(){
    when(gameRepositoryType.updateQuestion(question)).thenReturn(Completable.error(error));
    when(question.getqId()).thenReturn(FAKE_QID);
    when(gameRepositoryType.getGameById(FAKE_QID)).thenReturn(Single.just(game));
    when(gameRepositoryType.updateGame(game)).thenReturn(Completable.complete());

    interactor.updateQuestion(question)
            .test()
            .assertNotComplete()
            .assertError(error);

    verify(gameRepositoryType).updateQuestion(question);       //this is called in test
    verify(question,never()).getqId();                         //this is called in test
    verify(gameRepositoryType,never()).getGameById(FAKE_QID);  //this is called in test
    verify(gameRepositoryType,never()).updateGame(game);       //this is NEVER called in test
}

在测试方法结束时的验证中,我期望在第一个可完成的错误发出后,最后 3 次验证应该 运行 成功,即最后 3 次 verify() 应该永远不会被调用。

但是我可以看到所有方法都被调用,即使 gameRepositoryType.updateQuestion(question) 发出错误。这是为什么?

如果第一个可完成的错误发出错误,andThen() 不应该被调用吗?

completable1.doSomething()
    .andThen(completable2.doSomethingElse())

Shouldn't andThen() never be called if the first completable emitts error?

andThen 将始终调用 completable2.doSomethingElse() 来构建流,但它返回的 Completable 永远不会被订阅。

你可以这样测试:

when(completable1.doSomething()).thenReturn(Completable.error(error));

CompletableSubject completableSubject = CompletableSubject.create();
when(completable2.doSomethingElse()).thenReturn(completableSubject);


TestObserver testObserver = completable1.doSomething()
    .andThen(completable2.doSomethingElse()).test();

// Verify that completable2.doSomethingElse() was never subscribed to:
assertFalse(completableSubject.hasObservers());

testObserver
    .assertError(error)
    .assertNotComplete();

编辑:进一步说明:

鉴于您的方法定义如下:

private Completable updateQuestion(Question question) {
    System.out.println("prints when the updateQuestion is called");
    return Completable.fromAction(() -> {
        System.out.println("prints when updateQuestion is subscribed to");
        database.updateQuestion(question);
    });
}

private Completable updateGame() {
    System.out.println("prints when the updateGame is called");
    return Completable.fromAction(() -> {
        System.out.println("prints when updateGame is subscribed to");
        database.updateGame();
    });
}

假设您调用:

updateQuestion(question)
    .andThen(updateGame())
    .subscribe();

并且 database.updateQuestion(question); 抛出错误。

那么您的日志输出将是:

prints when the updateQuestion is called
prints when the updateGame is called
prints when updateQuestion is subscribed to
>> error 

database.updateGame();永远不会被执行

如果有人遇到同样的问题,请尝试 defer 第二个 Completable。

抽象示例:

repo.executeCompletable1()
                .andThen(
                    Completable.defer {
                        repo.executeCompletable2()
                    }
                )