如何断言 Completable 是否已经 subscribed/completed (RxJava2)

How to assert if a Completable has been subscribed/completed (RxJava2)

我无法确定 Completable 是否已在我的测试中订阅,例如:

interface IWebApi {
    Observable<Data> download();
}

interface IDataRepository {
    Completable insert(Data data);
}

class SyncService {

    IWebApi webApi;
    IDataRepository repository;

    public SyncService(IWebApi webApi, IDataRepository repository) {
        this.webApi = webApi;
        this.repository = repository;
    }

    public Completable sync() {
        return webApi.download()
            .flatMapCompletable((Data data) -> { repository.insert(data) })
    }
}

然后在我的测试中:

@Test
public void syncTest() {
    Data data = new Data();
    IDataRepository repository = mock (IDataRepository.class);
    IWebApi webApi = mock (IWebApi.class);

    given(webApi.download()).willReturn(Observable.just(data));
    given(repository.insert(data)).willReturn(Completable.complete());

    TestObserver<Void> observer = new TestObserver<Void>();
    SyncService service = new SyncService(webApi, repository);
    service.sync()
            .subscribe(observer);

    observer.assertComplete();
    verify(repository).insert(data);
}

这个测试会通过。但是我可以在不使用 flatMapCompletable 的情况下重写同步方法,如下所示:

    public Completable sync() {
        return webApi.download()
            .doOnNext((Data data) -> { repository.insert(data) })
            .ignoreElements();
    }

然后我的测试会通过,但代码将无法运行,因为即使我认为我已经调用了插入方法,但我没有在其中调用 subscribe()

我该如何处理?

P.S 我是 RxJava 的新手,所以如果我没有使用最佳实践,我很想知道:)

更新

修复了Maxim Ostrovidov

指出的没有调用.ingnoreElements()的错误

为了方便起见,您可以使用 test() 运算符:

SyncService service = new SyncService(webApi, repository);
TestObserver observer = service.sync().test();

But I Could rewrite the sync method without using flatMapCompletable like this:

public Completable sync() {
    return webApi.download()
        .doOnNext((Data data) -> { repository.insert(data) })
}

这不会编译,因为 doOnNext 仅用于调用对项目的操作,而不是更改流 return 类型。在您的情况下,方法期望 Completable 但实际上它将是 Observable<Data>.

即使您强制更改最终流类型:

public Completable sync() {
    return webApi.download()
        .doOnNext((Data data) -> { repository.insert(data) })
        .ignoreElements(); //converts to Completable
}

repository.insert(data) 不会被调用,因为 doOnNext 没有订阅您传递的任何内容,也没有 returning 任何内容:

//under the hood of lambdas
.doOnNext(new Consumer<Data>() {
    @Override
    public void accept(Data data) throws Exception {

    }
})

您的初始代码最适合您要实现的目标:

public Completable sync() {
    return webApi.download()
        .flatMapCompletable((Data data) -> { repository.insert(data) })
}

flatMapCompletable 使用 Observable:

发出的项目订阅传递的 Completable
.flatMapCompletable(new Function<Data, CompletableSource>() {
    @Override
    public CompletableSource apply(Data data) throws Exception {
        return repository.insert(data);
    }
})

编辑

要测试 repository.insert 订阅的事实,您可以使用另一个 TestObserverCompletable 并应用 doOnSubscribe:

TestObserver repositoryInsertObserver = TestObserver.create();
Completable insertCompletable = Completable.complete()
    .doOnSubscribe(d -> repositoryInsertObserver.onSubscribe(d));

//pass completable to your mock
given(repository.insert(data)).willReturn(insertCompletable);

//and verify that subscription took place
repositoryInsertObserver.assertSubscribed();