RxJava:令人费解的行为

RxJava: puzzling behavior

我想要 class 中的方法到 运行 IO 线程上的一些代码,但只有一次他们订阅的 Subject 具有特定值。然后调用者应该在 Android UI 线程上得到响应。

像这样:

public class MyClass {

  private final Subject<Boolean, Boolean> subject;
  private final OtherClass otherObject;

  public MyClass(Subject<Boolean, Boolean> subject,
      OtherClass otherObject) {
    this.subject = subject;
    this.otherObject = otherObject;
  }

  public Observable<String> myMethod() {
    return waitForTrue(() -> otherObject.readFromDisk());
  }

  private <T> Observable<T> waitForTrue(Callable<T> callable) {
    return subject
        .first(value -> value)
        .flatMap(value -> Observable.fromCallable(callable))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }
}

这个有用吗?不确定,所以我写了一组单元测试来检查它们。我发现我的测试方法,虽然它们总是在 运行 一个接一个地工作,但作为套件的一部分会失败。

事实上,我发现如果我对同一个测试进行两次,第一次会通过,但第二次会失败!

public class MyClassTest {

  private TestScheduler ioScheduler;
  private TestScheduler androidScheduler;
  private TestSubscriber<String> testSubscriber;
  private MyClass objectUnderTest;

  @Before public void setup() {
    ioScheduler = new TestScheduler();
    androidScheduler = new TestScheduler();
    testSubscriber = new TestSubscriber<>();
    RxJavaHooks.reset();
    RxJavaHooks.setOnIOScheduler(scheduler -> ioScheduler);
    RxAndroidPlugins.getInstance().reset();
    RxAndroidPlugins.getInstance().registerSchedulersHook(
        new RxAndroidSchedulersHook() {
          @Override public Scheduler getMainThreadScheduler() {
            return androidScheduler;
          };
        });
    Subject<Boolean, Boolean> subject = BehaviorSubject.create(true);
    MyClass.OtherClass otherClass = mock(MyClass.OtherClass.class);
    when(otherClass.readFromDisk()).thenReturn("mike");;
    objectUnderTest = new MyClass(subject, otherClass);
  };

  @Test public void firstTest() {
    objectUnderTest.myMethod().subscribe(testSubscriber);
    ioScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    androidScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    testSubscriber.assertValueCount(1);
    // This passes
  };

  @Test public void secondTest() {
    firstTest();
    // This fails!
  };
}

为什么会这样?并且是测试中class的bug,还是测试代码?

我认为这可能是使用 RxJava 的问题 1.x,但我对 RxJava 也有类似的问题 2.x。

编辑: 由于测试代码中缺少一行,测试失败。你必须把它放在设置方法中:

AndroidSchedulers.reset()

因为挂钩只被调用一次,由 AndroidSchedulers class.

的静态初始化器调用

subscribeOnSubject 没有实际影响,因为它们没有订阅副作用来转移到另一个线程。因此,当他们得到一个新项目时,他们会在调用者线程上通知他们的消费者。将项目移动到另一个线程应该通过 observeOn:

private <T> Observable<T> waitForTrue(Callable<T> callable) {
    return subject
    .filter(value -> value)
    .take(1)
    .observeOn(Schedulers.io())
    .map(value -> callable.call())
    .observeOn(AndroidSchedulers.mainThread());
}

此外,您不需要 flatMap 仅执行 callable,映射就足够了。