在单元测试中,如何在断言结果之前等待 RxJava2 Observable 完成
In an unit test, how to wait for an RxJava2 Observable to finish before asserting results
我正在尝试测试订阅冷可观察对象的部分代码。我的问题是单元测试在断言结果之前没有等待内部可观察调用结束,因此我的测试失败了。
这是我要测试的代码:
public class UserManager {
private UserRepository userRepository;
private PublishSubject<List<User>> usersSubject = PublishSubject.create();
public UserManager(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Observable<List<User>> users() {
return usersSubject;
}
public void onUserIdsReceived(List<String> userIds) {
userRepository.getUsersInfo(userIds)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<List<User>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
@Override
public void onSuccess(@NonNull List<User> users) {
usersSubject.onNext(users);
}
@Override
public void onError(Throwable e) {}
}
}
}
这是我的测试:
@RunWith(MockitoJUnitRunner::class)
class UserManagerTest {
private val userRepository = mock(UserRepository::class.java)
companion object {
@BeforeClass
@JvmStatic
fun setup() {
RxJavaPlugins.setIoSchedulerHandler({ _ -> TestScheduler() })
RxJavaPlugins.setSingleSchedulerHandler({ _ -> TestScheduler() })
RxJavaPlugins.setNewThreadSchedulerHandler({ _ -> TestScheduler() })
RxJavaPlugins.setComputationSchedulerHandler({ _ -> TestScheduler() })
RxAndroidPlugins.setInitMainThreadSchedulerHandler({ _ -> TestScheduler() })
RxAndroidPlugins.setMainThreadSchedulerHandler({ _ -> TestScheduler() })
}
}
@Test
fun testReceivedUserIdResultsInPushedUser() {
// Given
val usersIds = mutableListOf<String>()
usersIds.add("id1")
val usersDetails = mutableListOf<User>()
usersDetails.add(User.testUser())
given(userRepository.getUsersInfo(usersIds)).willReturn(Single.just(usersDetails))
val userManager = UserManager(userRepository)
val testObserver = TestObserver<List<User>>()
userManager.users().subscribeWith(testObserver)
// When
userManager.onUsersIdsReceived(usersIds)
// Then
testObserver.assertNoErrors()
testObserver.assertNotComplete()
testObserver.assertValueCount(1)
testObserver.assertValue( { detectedUsers: List<DetectedUser> ->
// My test
})
}
我的测试失败是因为我的 testObserver 没有收到任何事件,最合乎逻辑的原因是测试没有等待 userRepository.getUsersInfo(userIds) 调用结束,即使我嘲笑了UserRepository 并使用了 TestScheduler。
有没有办法等待内部 Observable 调用结束,从而使我的代码可测试?
我正在使用 trampoline
而不是像这样的 TestScheduler,它是同步执行的。
// override Schedulers.io()
RxJavaPlugins.setIoSchedulerHandler(schedulerCallable -> Schedulers.trampoline());
// override AndroidSchedulers.mainThread()
RxAndroidPlugins.setInitMainThreadSchedulerHandler(schedulerCallable -> Schedulers.trampoline());
也考虑删除
RxAndroidPlugins.setMainThreadSchedulerHandler({ _ -> TestScheduler() })
我想我遇到了一些问题。
我正在尝试测试订阅冷可观察对象的部分代码。我的问题是单元测试在断言结果之前没有等待内部可观察调用结束,因此我的测试失败了。
这是我要测试的代码:
public class UserManager {
private UserRepository userRepository;
private PublishSubject<List<User>> usersSubject = PublishSubject.create();
public UserManager(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Observable<List<User>> users() {
return usersSubject;
}
public void onUserIdsReceived(List<String> userIds) {
userRepository.getUsersInfo(userIds)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<List<User>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
@Override
public void onSuccess(@NonNull List<User> users) {
usersSubject.onNext(users);
}
@Override
public void onError(Throwable e) {}
}
}
}
这是我的测试:
@RunWith(MockitoJUnitRunner::class)
class UserManagerTest {
private val userRepository = mock(UserRepository::class.java)
companion object {
@BeforeClass
@JvmStatic
fun setup() {
RxJavaPlugins.setIoSchedulerHandler({ _ -> TestScheduler() })
RxJavaPlugins.setSingleSchedulerHandler({ _ -> TestScheduler() })
RxJavaPlugins.setNewThreadSchedulerHandler({ _ -> TestScheduler() })
RxJavaPlugins.setComputationSchedulerHandler({ _ -> TestScheduler() })
RxAndroidPlugins.setInitMainThreadSchedulerHandler({ _ -> TestScheduler() })
RxAndroidPlugins.setMainThreadSchedulerHandler({ _ -> TestScheduler() })
}
}
@Test
fun testReceivedUserIdResultsInPushedUser() {
// Given
val usersIds = mutableListOf<String>()
usersIds.add("id1")
val usersDetails = mutableListOf<User>()
usersDetails.add(User.testUser())
given(userRepository.getUsersInfo(usersIds)).willReturn(Single.just(usersDetails))
val userManager = UserManager(userRepository)
val testObserver = TestObserver<List<User>>()
userManager.users().subscribeWith(testObserver)
// When
userManager.onUsersIdsReceived(usersIds)
// Then
testObserver.assertNoErrors()
testObserver.assertNotComplete()
testObserver.assertValueCount(1)
testObserver.assertValue( { detectedUsers: List<DetectedUser> ->
// My test
})
}
我的测试失败是因为我的 testObserver 没有收到任何事件,最合乎逻辑的原因是测试没有等待 userRepository.getUsersInfo(userIds) 调用结束,即使我嘲笑了UserRepository 并使用了 TestScheduler。 有没有办法等待内部 Observable 调用结束,从而使我的代码可测试?
我正在使用 trampoline
而不是像这样的 TestScheduler,它是同步执行的。
// override Schedulers.io()
RxJavaPlugins.setIoSchedulerHandler(schedulerCallable -> Schedulers.trampoline());
// override AndroidSchedulers.mainThread()
RxAndroidPlugins.setInitMainThreadSchedulerHandler(schedulerCallable -> Schedulers.trampoline());
也考虑删除
RxAndroidPlugins.setMainThreadSchedulerHandler({ _ -> TestScheduler() })
我想我遇到了一些问题。