使用 LiveData 的 JUnit5 测试不执行订阅者的回调
JUnit5 test with LiveData doesn't execute subscriber's callback
背景:
我有一个简单的应用程序,它使用 rests API 调用获取电影列表。项目结构如下,
Activity -> ViewModel -> Repository -> ApiService (Retrofit Interface)
activity 订阅了一个 LiveData 并监听事件变化
ViewModel 托管 MediatorLiveData,由 activity 观察。最初 ViewModel 在 MediatorLiveData.
中设置了一个 Resource.loading(..)
值
然后 ViewModel 调用存储库从 ApiService
获取电影列表
ApiService returns Resource.success(..)
或 [=21] 的 LiveData =]
然后 ViewModel 合并来自 ApiService 的 LiveData 结果 MediatorLiveData
我的查询:
在单元测试中,只有第一个发射 Resource.loading(..)
是由 MediatorLiveData 来自 ViewModel。 MediatorLiveData 从不从存储库发出任何数据。
ViewModel.class
private var discoverMovieLiveData: MediatorLiveData<Resource<DiscoverMovieResponse>> = MediatorLiveData()
fun observeDiscoverMovie(): LiveData<Resource<DiscoverMovieResponse>> {
return discoverMovieLiveData
}
fun fetchDiscoverMovies(page: Int) {
discoverMovieLiveData.value = Resource.loading(null) // this emit get observed immediately
val source = movieRepository.fetchDiscoverMovies(page)
discoverMovieLiveData.addSource(source) {
discoverMovieLiveData.value = it // never gets called
discoverMovieLiveData.removeSource(source)
}
}
Repository.class
fun fetchDiscoverMovies(page: Int): LiveData<Resource<DiscoverMovieResponse>> {
return LiveDataReactiveStreams.fromPublisher(
apiService.fetchDiscoverMovies(page)
.subscribeOn(Schedulers.io())
.map { d ->
Resource.success(d) // never gets called in unit test
}
.onErrorReturn { e ->
Resource.error(ApiErrorHandler.getErrorByThrowable(e), null) // // never gets called in unit test
}
)
}
单元测试
@Test
fun loadMovieListFromNetwork() {
val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10)
val call: Flowable<DiscoverMovieResponse> = successCall(mockResponse) // wraps the retrofit result inside a Flowable<DiscoverMovieResponse>
whenever(apiService.fetchDiscoverMovies(1)).thenReturn(call)
viewModel.fetchDiscoverMovies(1)
verify(apiService).fetchDiscoverMovies(1)
verifyNoMoreInteractions(apiService)
val liveData = viewModel.observeDiscoverMovie()
val observer: Observer<Resource<DiscoverMovieResponse>> = mock()
liveData.observeForever(observer)
verify(observer).onChanged(
Resource.success(mockResponse) // TEST FAILS HERE AND GETS "Resource.loading(null)"
)
}
Resource 是一个通用包装器 class,它包装不同场景的数据,例如加载,成功,错误。
class Resource<out T>(val status: Status, val data: T?, val message: String?) {
.......
}
编辑:#1
出于测试目的,我已将存储库中的 rx 线程更新为 运行 在主线程中。这最终导致 Looper not mocked 异常。
fun fetchDiscoverMovies(page: Int): LiveData<Resource<DiscoverMovieResponse>> {
return LiveDataReactiveStreams.fromPublisher(
apiService.fetchDiscoverMovies(page)
.subscribeOn(AndroidSchedulers.mainThread())
.map {...}
.onErrorReturn {...}
)
}
测试中class,
@ExtendWith(InstantExecutorExtension::class)
class MainViewModelTest {
companion object {
@ClassRule
@JvmField
val schedulers = RxImmediateSchedulerRule()
}
@Test
fun loadMovieListFromNetwork() {
.....
}
}
}
RxImmediateSchedulerRule.class
class RxImmediateSchedulerRule : TestRule {
private val immediate = object : Scheduler() {
override fun createWorker(): Worker {
return ExecutorScheduler.ExecutorWorker(Executor { it.run() })
}
}
override fun apply(base: Statement, description: Description): Statement {
return object : Statement() {
@Throws(Throwable::class)
override fun evaluate() {
RxJavaPlugins.setInitIoSchedulerHandler { immediate }
RxJavaPlugins.setInitComputationSchedulerHandler { immediate }
RxJavaPlugins.setInitNewThreadSchedulerHandler { immediate }
RxJavaPlugins.setInitSingleSchedulerHandler { immediate }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediate }
try {
base.evaluate()
} finally {
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
}
}
}
}
}
InstantExecutorExtension.class
class InstantExecutorExtension : BeforeEachCallback, AfterEachCallback {
override fun beforeEach(context: ExtensionContext?) {
ArchTaskExecutor.getInstance().setDelegate(object : TaskExecutor() {
override fun executeOnDiskIO(runnable: Runnable) {
runnable.run()
}
override fun postToMainThread(runnable: Runnable) {
runnable.run()
}
override fun isMainThread(): Boolean {
return true
}
})
}
override fun afterEach(context: ExtensionContext?) {
ArchTaskExecutor.getInstance().setDelegate(null)
}
}
您指定 RxImmediateSchedulerRule
的方式不适用于 JUnit5。如果您在 apply()
方法中放置一个断点,您将看到它没有被执行。
相反,您应该按照指定创建扩展 here:
class TestSchedulerExtension : BeforeTestExecutionCallback, AfterTestExecutionCallback {
override fun beforeTestExecution(context: ExtensionContext?) {
RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }
RxJavaPlugins.setComputationSchedulerHandler { Schedulers.trampoline() }
RxJavaPlugins.setNewThreadSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setMainThreadSchedulerHandler { Schedulers.trampoline() }
}
override fun afterTestExecution(context: ExtensionContext?) {
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
}
}
然后在测试 class 的注解中应用 TestSchedulerExtension
:
@ExtendWith(value = [InstantExecutorExtension::class, TestSchedulerExtension::class])
class MainViewModelTest {
private val apiService: ApiService = mock()
private lateinit var movieRepository: MovieRepository
private lateinit var viewModel: MainViewModel
@BeforeEach
fun init() {
movieRepository = MovieRepository(apiService)
viewModel = MainViewModel(movieRepository)
}
@Test
fun loadMovieListFromNetwork() {
val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10, 0, "", false)
val call: Flowable = Flowable.just(mockResponse)
whenever(apiService.fetchDiscoverMovies(1)).thenReturn(call)
viewModel.fetchDiscoverMovies(1)
assertEquals(Resource.success(mockResponse), LiveDataTestUtil.getValue(viewModel.discoverMovieLiveData))
}
}
现在测试将通过。现在你已经测试过了,那个观察者已经被分配了预期的值。
换个角度:这是单元测试吗?当然不是,因为在这个测试中我们与 2 个单元交互:MainViewModel
和 MovieRepository
。这更符合"integration test"的说法。如果你模拟了 MoviesRepository
,那么这将是一个有效的单元测试:
@ExtendWith(value = [InstantExecutorExtension::class, TestSchedulerExtension::class])
class MainViewModelTest {
private val movieRepository: MovieRepository = mock()
private val viewModel = MainViewModel(movieRepository)
@Test
fun loadMovieListFromNetwork() {
val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10, 0, "", false)
val liveData =
MutableLiveData>().apply { value = Resource.success(mockResponse) }
whenever(movieRepository.fetchDiscoverMovies(1)).thenReturn(liveData)
viewModel.fetchDiscoverMovies(1)
assertEquals(Resource.success(mockResponse), getValue(viewModel.discoverMovieLiveData))
}
}
请注意,MovieRepository
应与 fetchDiscoverMovies()
一起声明为 open
以便能够模拟它。或者,您可以考虑使用 kotlin-allopen
插件。
我想你需要做的就是改变
val call: Flowable<DiscoverMovieResponse> = successCall(mockResponse)
到
val call: Flowable<DiscoverMovieResponse> = Flowable.just(mockResponse)
并且利用架构组件google示例中的LiveDataUtilclass。因此,您需要 copy/paste 将其添加到您的项目中。
所以在一天结束时你的新测试看起来像这样(假设所有关联和模拟都在测试的顶部正确设置class).此外,您正在使用 InstantExecutorExtension,就像上面的 azizbekian 向您展示的那样。
@Test
fun loadMovieListFromNetwork() {
val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10)
val call: Flowable<DiscoverMovieResponse> = Flowable.just(mockResponse)
whenever(apiService.fetchDiscoverMovies(1)).thenReturn(call)
viewModel.fetchDiscoverMovies(1)
assertEquals(Resource.success(mockResponse), LiveDataTestUtil.getValue(viewModel.discoverMovieLiveData))
}
如果该测试通过,则意味着您能够成功观察到网络请求的结果并且return 成功响应。
背景:
我有一个简单的应用程序,它使用 rests API 调用获取电影列表。项目结构如下,
Activity -> ViewModel -> Repository -> ApiService (Retrofit Interface)
activity 订阅了一个 LiveData 并监听事件变化
ViewModel 托管 MediatorLiveData,由 activity 观察。最初 ViewModel 在 MediatorLiveData.
中设置了一个Resource.loading(..)
值然后 ViewModel 调用存储库从 ApiService
获取电影列表ApiService returns
Resource.success(..)
或 [=21] 的 LiveData =]然后 ViewModel 合并来自 ApiService 的 LiveData 结果 MediatorLiveData
我的查询:
在单元测试中,只有第一个发射 Resource.loading(..)
是由 MediatorLiveData 来自 ViewModel。 MediatorLiveData 从不从存储库发出任何数据。
ViewModel.class
private var discoverMovieLiveData: MediatorLiveData<Resource<DiscoverMovieResponse>> = MediatorLiveData()
fun observeDiscoverMovie(): LiveData<Resource<DiscoverMovieResponse>> {
return discoverMovieLiveData
}
fun fetchDiscoverMovies(page: Int) {
discoverMovieLiveData.value = Resource.loading(null) // this emit get observed immediately
val source = movieRepository.fetchDiscoverMovies(page)
discoverMovieLiveData.addSource(source) {
discoverMovieLiveData.value = it // never gets called
discoverMovieLiveData.removeSource(source)
}
}
Repository.class
fun fetchDiscoverMovies(page: Int): LiveData<Resource<DiscoverMovieResponse>> {
return LiveDataReactiveStreams.fromPublisher(
apiService.fetchDiscoverMovies(page)
.subscribeOn(Schedulers.io())
.map { d ->
Resource.success(d) // never gets called in unit test
}
.onErrorReturn { e ->
Resource.error(ApiErrorHandler.getErrorByThrowable(e), null) // // never gets called in unit test
}
)
}
单元测试
@Test
fun loadMovieListFromNetwork() {
val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10)
val call: Flowable<DiscoverMovieResponse> = successCall(mockResponse) // wraps the retrofit result inside a Flowable<DiscoverMovieResponse>
whenever(apiService.fetchDiscoverMovies(1)).thenReturn(call)
viewModel.fetchDiscoverMovies(1)
verify(apiService).fetchDiscoverMovies(1)
verifyNoMoreInteractions(apiService)
val liveData = viewModel.observeDiscoverMovie()
val observer: Observer<Resource<DiscoverMovieResponse>> = mock()
liveData.observeForever(observer)
verify(observer).onChanged(
Resource.success(mockResponse) // TEST FAILS HERE AND GETS "Resource.loading(null)"
)
}
Resource 是一个通用包装器 class,它包装不同场景的数据,例如加载,成功,错误。
class Resource<out T>(val status: Status, val data: T?, val message: String?) {
.......
}
编辑:#1
出于测试目的,我已将存储库中的 rx 线程更新为 运行 在主线程中。这最终导致 Looper not mocked 异常。
fun fetchDiscoverMovies(page: Int): LiveData<Resource<DiscoverMovieResponse>> {
return LiveDataReactiveStreams.fromPublisher(
apiService.fetchDiscoverMovies(page)
.subscribeOn(AndroidSchedulers.mainThread())
.map {...}
.onErrorReturn {...}
)
}
测试中class,
@ExtendWith(InstantExecutorExtension::class)
class MainViewModelTest {
companion object {
@ClassRule
@JvmField
val schedulers = RxImmediateSchedulerRule()
}
@Test
fun loadMovieListFromNetwork() {
.....
}
}
}
RxImmediateSchedulerRule.class
class RxImmediateSchedulerRule : TestRule {
private val immediate = object : Scheduler() {
override fun createWorker(): Worker {
return ExecutorScheduler.ExecutorWorker(Executor { it.run() })
}
}
override fun apply(base: Statement, description: Description): Statement {
return object : Statement() {
@Throws(Throwable::class)
override fun evaluate() {
RxJavaPlugins.setInitIoSchedulerHandler { immediate }
RxJavaPlugins.setInitComputationSchedulerHandler { immediate }
RxJavaPlugins.setInitNewThreadSchedulerHandler { immediate }
RxJavaPlugins.setInitSingleSchedulerHandler { immediate }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediate }
try {
base.evaluate()
} finally {
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
}
}
}
}
}
InstantExecutorExtension.class
class InstantExecutorExtension : BeforeEachCallback, AfterEachCallback {
override fun beforeEach(context: ExtensionContext?) {
ArchTaskExecutor.getInstance().setDelegate(object : TaskExecutor() {
override fun executeOnDiskIO(runnable: Runnable) {
runnable.run()
}
override fun postToMainThread(runnable: Runnable) {
runnable.run()
}
override fun isMainThread(): Boolean {
return true
}
})
}
override fun afterEach(context: ExtensionContext?) {
ArchTaskExecutor.getInstance().setDelegate(null)
}
}
您指定 RxImmediateSchedulerRule
的方式不适用于 JUnit5。如果您在 apply()
方法中放置一个断点,您将看到它没有被执行。
相反,您应该按照指定创建扩展 here:
class TestSchedulerExtension : BeforeTestExecutionCallback, AfterTestExecutionCallback {
override fun beforeTestExecution(context: ExtensionContext?) {
RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }
RxJavaPlugins.setComputationSchedulerHandler { Schedulers.trampoline() }
RxJavaPlugins.setNewThreadSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setMainThreadSchedulerHandler { Schedulers.trampoline() }
}
override fun afterTestExecution(context: ExtensionContext?) {
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
}
}
然后在测试 class 的注解中应用 TestSchedulerExtension
:
@ExtendWith(value = [InstantExecutorExtension::class, TestSchedulerExtension::class])
class MainViewModelTest {
private val apiService: ApiService = mock()
private lateinit var movieRepository: MovieRepository
private lateinit var viewModel: MainViewModel
@BeforeEach
fun init() {
movieRepository = MovieRepository(apiService)
viewModel = MainViewModel(movieRepository)
}
@Test
fun loadMovieListFromNetwork() {
val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10, 0, "", false)
val call: Flowable = Flowable.just(mockResponse)
whenever(apiService.fetchDiscoverMovies(1)).thenReturn(call)
viewModel.fetchDiscoverMovies(1)
assertEquals(Resource.success(mockResponse), LiveDataTestUtil.getValue(viewModel.discoverMovieLiveData))
}
}
现在测试将通过。现在你已经测试过了,那个观察者已经被分配了预期的值。
换个角度:这是单元测试吗?当然不是,因为在这个测试中我们与 2 个单元交互:MainViewModel
和 MovieRepository
。这更符合"integration test"的说法。如果你模拟了 MoviesRepository
,那么这将是一个有效的单元测试:
@ExtendWith(value = [InstantExecutorExtension::class, TestSchedulerExtension::class])
class MainViewModelTest {
private val movieRepository: MovieRepository = mock()
private val viewModel = MainViewModel(movieRepository)
@Test
fun loadMovieListFromNetwork() {
val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10, 0, "", false)
val liveData =
MutableLiveData>().apply { value = Resource.success(mockResponse) }
whenever(movieRepository.fetchDiscoverMovies(1)).thenReturn(liveData)
viewModel.fetchDiscoverMovies(1)
assertEquals(Resource.success(mockResponse), getValue(viewModel.discoverMovieLiveData))
}
}
请注意,MovieRepository
应与 fetchDiscoverMovies()
一起声明为 open
以便能够模拟它。或者,您可以考虑使用 kotlin-allopen
插件。
我想你需要做的就是改变
val call: Flowable<DiscoverMovieResponse> = successCall(mockResponse)
到
val call: Flowable<DiscoverMovieResponse> = Flowable.just(mockResponse)
并且利用架构组件google示例中的LiveDataUtilclass。因此,您需要 copy/paste 将其添加到您的项目中。
所以在一天结束时你的新测试看起来像这样(假设所有关联和模拟都在测试的顶部正确设置class).此外,您正在使用 InstantExecutorExtension,就像上面的 azizbekian 向您展示的那样。
@Test
fun loadMovieListFromNetwork() {
val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10)
val call: Flowable<DiscoverMovieResponse> = Flowable.just(mockResponse)
whenever(apiService.fetchDiscoverMovies(1)).thenReturn(call)
viewModel.fetchDiscoverMovies(1)
assertEquals(Resource.success(mockResponse), LiveDataTestUtil.getValue(viewModel.discoverMovieLiveData))
}
如果该测试通过,则意味着您能够成功观察到网络请求的结果并且return 成功响应。