测试 observable 仅评估一次

Test observable evaluated only once

我们正在使用 rx-java,但是我们有一个问题来测试 Observable 是否只被评估一次。

我们有以下服务签名:

class A {
  Observable<String> computeA();
}

class B {
  Observable<Void> computeB(String inputA)
}

class C {
  Observable<Void> computeC(String inputA)
}

在我们的例子中,我们有一个调用服务 A 的服务,我们使用该服务的结果作为另外两个服务 B 和 C 的输入。我们要检查服务 A 是否仅被调用一次。

我们尝试使用 AtomicInteger 作为计数器来模拟服务 A 的 Observable 或只是模拟 Observable.OnSubscribe 检查调用调用的时间,但测试变得非常难以阅读。

问题是:有没有更简洁的方法来进行这种测试?

我们通常通过添加

source
.doOnSubscribe(() -> counter.getAndIncrement())
.op()
.op()
...

在需要的地方,在事情安静下来后检查计数器值。

请注意,我们有一个名为 publish() 的运算符,它确保一个源被订阅一次并且它的所有副作用只发生一次(只要 connect() 被调用一次)。