RxJava:令人费解的行为
RxJava: puzzling behavior
我想要 class 中的方法到 运行 IO 线程上的一些代码,但只有一次他们订阅的 Subject 具有特定值。然后调用者应该在 Android UI 线程上得到响应。
像这样:
public class MyClass {
private final Subject<Boolean, Boolean> subject;
private final OtherClass otherObject;
public MyClass(Subject<Boolean, Boolean> subject,
OtherClass otherObject) {
this.subject = subject;
this.otherObject = otherObject;
}
public Observable<String> myMethod() {
return waitForTrue(() -> otherObject.readFromDisk());
}
private <T> Observable<T> waitForTrue(Callable<T> callable) {
return subject
.first(value -> value)
.flatMap(value -> Observable.fromCallable(callable))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
}
这个有用吗?不确定,所以我写了一组单元测试来检查它们。我发现我的测试方法,虽然它们总是在 运行 一个接一个地工作,但作为套件的一部分会失败。
事实上,我发现如果我对同一个测试进行两次,第一次会通过,但第二次会失败!
public class MyClassTest {
private TestScheduler ioScheduler;
private TestScheduler androidScheduler;
private TestSubscriber<String> testSubscriber;
private MyClass objectUnderTest;
@Before public void setup() {
ioScheduler = new TestScheduler();
androidScheduler = new TestScheduler();
testSubscriber = new TestSubscriber<>();
RxJavaHooks.reset();
RxJavaHooks.setOnIOScheduler(scheduler -> ioScheduler);
RxAndroidPlugins.getInstance().reset();
RxAndroidPlugins.getInstance().registerSchedulersHook(
new RxAndroidSchedulersHook() {
@Override public Scheduler getMainThreadScheduler() {
return androidScheduler;
};
});
Subject<Boolean, Boolean> subject = BehaviorSubject.create(true);
MyClass.OtherClass otherClass = mock(MyClass.OtherClass.class);
when(otherClass.readFromDisk()).thenReturn("mike");;
objectUnderTest = new MyClass(subject, otherClass);
};
@Test public void firstTest() {
objectUnderTest.myMethod().subscribe(testSubscriber);
ioScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
androidScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testSubscriber.assertValueCount(1);
// This passes
};
@Test public void secondTest() {
firstTest();
// This fails!
};
}
为什么会这样?并且是测试中class的bug,还是测试代码?
我认为这可能是使用 RxJava 的问题 1.x,但我对 RxJava 也有类似的问题 2.x。
编辑: 由于测试代码中缺少一行,测试失败。你必须把它放在设置方法中:
AndroidSchedulers.reset()
因为挂钩只被调用一次,由 AndroidSchedulers
class.
的静态初始化器调用
subscribeOn
对 Subject
没有实际影响,因为它们没有订阅副作用来转移到另一个线程。因此,当他们得到一个新项目时,他们会在调用者线程上通知他们的消费者。将项目移动到另一个线程应该通过 observeOn
:
private <T> Observable<T> waitForTrue(Callable<T> callable) {
return subject
.filter(value -> value)
.take(1)
.observeOn(Schedulers.io())
.map(value -> callable.call())
.observeOn(AndroidSchedulers.mainThread());
}
此外,您不需要 flatMap
仅执行 callable
,映射就足够了。
我想要 class 中的方法到 运行 IO 线程上的一些代码,但只有一次他们订阅的 Subject 具有特定值。然后调用者应该在 Android UI 线程上得到响应。
像这样:
public class MyClass {
private final Subject<Boolean, Boolean> subject;
private final OtherClass otherObject;
public MyClass(Subject<Boolean, Boolean> subject,
OtherClass otherObject) {
this.subject = subject;
this.otherObject = otherObject;
}
public Observable<String> myMethod() {
return waitForTrue(() -> otherObject.readFromDisk());
}
private <T> Observable<T> waitForTrue(Callable<T> callable) {
return subject
.first(value -> value)
.flatMap(value -> Observable.fromCallable(callable))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
}
这个有用吗?不确定,所以我写了一组单元测试来检查它们。我发现我的测试方法,虽然它们总是在 运行 一个接一个地工作,但作为套件的一部分会失败。
事实上,我发现如果我对同一个测试进行两次,第一次会通过,但第二次会失败!
public class MyClassTest {
private TestScheduler ioScheduler;
private TestScheduler androidScheduler;
private TestSubscriber<String> testSubscriber;
private MyClass objectUnderTest;
@Before public void setup() {
ioScheduler = new TestScheduler();
androidScheduler = new TestScheduler();
testSubscriber = new TestSubscriber<>();
RxJavaHooks.reset();
RxJavaHooks.setOnIOScheduler(scheduler -> ioScheduler);
RxAndroidPlugins.getInstance().reset();
RxAndroidPlugins.getInstance().registerSchedulersHook(
new RxAndroidSchedulersHook() {
@Override public Scheduler getMainThreadScheduler() {
return androidScheduler;
};
});
Subject<Boolean, Boolean> subject = BehaviorSubject.create(true);
MyClass.OtherClass otherClass = mock(MyClass.OtherClass.class);
when(otherClass.readFromDisk()).thenReturn("mike");;
objectUnderTest = new MyClass(subject, otherClass);
};
@Test public void firstTest() {
objectUnderTest.myMethod().subscribe(testSubscriber);
ioScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
androidScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testSubscriber.assertValueCount(1);
// This passes
};
@Test public void secondTest() {
firstTest();
// This fails!
};
}
为什么会这样?并且是测试中class的bug,还是测试代码?
我认为这可能是使用 RxJava 的问题 1.x,但我对 RxJava 也有类似的问题 2.x。
编辑: 由于测试代码中缺少一行,测试失败。你必须把它放在设置方法中:
AndroidSchedulers.reset()
因为挂钩只被调用一次,由 AndroidSchedulers
class.
subscribeOn
对 Subject
没有实际影响,因为它们没有订阅副作用来转移到另一个线程。因此,当他们得到一个新项目时,他们会在调用者线程上通知他们的消费者。将项目移动到另一个线程应该通过 observeOn
:
private <T> Observable<T> waitForTrue(Callable<T> callable) {
return subject
.filter(value -> value)
.take(1)
.observeOn(Schedulers.io())
.map(value -> callable.call())
.observeOn(AndroidSchedulers.mainThread());
}
此外,您不需要 flatMap
仅执行 callable
,映射就足够了。