RxJava 2 在单元测试中覆盖 IO 调度程序
RxJava 2 overriding IO scheduler in unit test
我正在尝试测试以下 RxKotlin/RxJava 2 代码:
validate(data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { ... }
我正在尝试按如下方式覆盖调度程序:
// Runs before each test suite
RxJavaPlugins.setInitIoSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { Schedulers.trampoline() }
但是,运行测试时出现以下错误:
java.lang.ExceptionInInitializerError
...
Caused by: java.lang.NullPointerException: Scheduler Callable result can't be null
at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
at io.reactivex.plugins.RxJavaPlugins.applyRequireNonNull(RxJavaPlugins.java:1317)
at io.reactivex.plugins.RxJavaPlugins.initIoScheduler(RxJavaPlugins.java:306)
at io.reactivex.schedulers.Schedulers.<clinit>(Schedulers.java:84)
有人遇到过这个问题吗?
测试在使用 RxKotlin/RxJava 1 和以下调度程序覆盖时运行良好:
RxAndroidPlugins.getInstance().registerSchedulersHook(object : RxAndroidSchedulersHook() {
override fun getMainThreadScheduler() = Schedulers.immediate()
})
RxJavaPlugins.getInstance().registerSchedulersHook(object : RxJavaSchedulersHook() {
override fun getIOScheduler() = Schedulers.immediate()
})
想通了!它与以下代码中的事实有关:
validate(data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { ... }
validate(data)
正在 returning 一个 Observable
,它发出以下内容:emitter.onNext(null)
。由于 RxJava 2 不再接受 null
值,因此 flatMap
没有被调用。我将 validate
更改为 return a Completable
并将调度程序覆盖更新为以下内容:
RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }
现在测试通过了!
我建议您采用不同的方法并为您的调度程序添加一个抽象层。这家伙有一个很好的article。
在 Kotlin 中看起来像这样
interface SchedulerProvider {
fun ui(): Scheduler
fun computation(): Scheduler
fun trampoline(): Scheduler
fun newThread(): Scheduler
fun io(): Scheduler
}
然后用自己的 SchedulerProvider 实现覆盖它:
class AppSchedulerProvider : SchedulerProvider {
override fun ui(): Scheduler {
return AndroidSchedulers.mainThread()
}
override fun computation(): Scheduler {
return Schedulers.computation()
}
override fun trampoline(): Scheduler {
return Schedulers.trampoline()
}
override fun newThread(): Scheduler {
return Schedulers.newThread()
}
override fun io(): Scheduler {
return Schedulers.io()
}
}
还有一个用于测试 类:
class TestSchedulerProvider : SchedulerProvider {
override fun ui(): Scheduler {
return Schedulers.trampoline()
}
override fun computation(): Scheduler {
return Schedulers.trampoline()
}
override fun trampoline(): Scheduler {
return Schedulers.trampoline()
}
override fun newThread(): Scheduler {
return Schedulers.trampoline()
}
override fun io(): Scheduler {
return Schedulers.trampoline()
}
}
你的代码在你调用 RxJava 的地方看起来像这样:
mCompositeDisposable.add(mDataManager.getQuote()
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
.subscribe(Consumer<Quote> {
...
您只需根据测试位置覆盖 SchedulerProvider
的实施。这是一个示例项目供参考,我正在链接将使用 SchedulerProvider
的可测试版本的测试文件:https://github.com/Obaied/DingerQuotes/blob/master/app/src/test/java/com/obaied/dingerquotes/QuotePresenterTest.kt#L31
这正是对我有用的语法:
RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline())
作为建议解决方案的替代方案,这在我的项目中已经运行了一段时间。
您可以在测试中使用它 classes 像这样:
@get:Rule
val immediateSchedulersRule = ImmediateSchedulersRule()
class 看起来像这样:
class ImmediateSchedulersRule : ExternalResource() {
val immediateScheduler: Scheduler = object : Scheduler() {
override fun createWorker() = ExecutorScheduler.ExecutorWorker(Executor { it.run() })
// This prevents errors when scheduling a delay
override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
return super.scheduleDirect(run, 0, unit)
}
}
override fun before() {
RxJavaPlugins.setIoSchedulerHandler { immediateScheduler }
RxJavaPlugins.setComputationSchedulerHandler { immediateScheduler }
RxJavaPlugins.setNewThreadSchedulerHandler { immediateScheduler }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediateScheduler }
RxAndroidPlugins.setMainThreadSchedulerHandler { immediateScheduler }
}
override fun after() {
RxJavaPlugins.reset()
}
}
您可以找到一种从 TestRule 迁移到 ExternalResource 的方法here and get more info on testing RxJava 2 here。
我正在尝试测试以下 RxKotlin/RxJava 2 代码:
validate(data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { ... }
我正在尝试按如下方式覆盖调度程序:
// Runs before each test suite
RxJavaPlugins.setInitIoSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { Schedulers.trampoline() }
但是,运行测试时出现以下错误:
java.lang.ExceptionInInitializerError
...
Caused by: java.lang.NullPointerException: Scheduler Callable result can't be null
at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
at io.reactivex.plugins.RxJavaPlugins.applyRequireNonNull(RxJavaPlugins.java:1317)
at io.reactivex.plugins.RxJavaPlugins.initIoScheduler(RxJavaPlugins.java:306)
at io.reactivex.schedulers.Schedulers.<clinit>(Schedulers.java:84)
有人遇到过这个问题吗?
测试在使用 RxKotlin/RxJava 1 和以下调度程序覆盖时运行良好:
RxAndroidPlugins.getInstance().registerSchedulersHook(object : RxAndroidSchedulersHook() {
override fun getMainThreadScheduler() = Schedulers.immediate()
})
RxJavaPlugins.getInstance().registerSchedulersHook(object : RxJavaSchedulersHook() {
override fun getIOScheduler() = Schedulers.immediate()
})
想通了!它与以下代码中的事实有关:
validate(data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { ... }
validate(data)
正在 returning 一个 Observable
,它发出以下内容:emitter.onNext(null)
。由于 RxJava 2 不再接受 null
值,因此 flatMap
没有被调用。我将 validate
更改为 return a Completable
并将调度程序覆盖更新为以下内容:
RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }
现在测试通过了!
我建议您采用不同的方法并为您的调度程序添加一个抽象层。这家伙有一个很好的article。
在 Kotlin 中看起来像这样
interface SchedulerProvider {
fun ui(): Scheduler
fun computation(): Scheduler
fun trampoline(): Scheduler
fun newThread(): Scheduler
fun io(): Scheduler
}
然后用自己的 SchedulerProvider 实现覆盖它:
class AppSchedulerProvider : SchedulerProvider {
override fun ui(): Scheduler {
return AndroidSchedulers.mainThread()
}
override fun computation(): Scheduler {
return Schedulers.computation()
}
override fun trampoline(): Scheduler {
return Schedulers.trampoline()
}
override fun newThread(): Scheduler {
return Schedulers.newThread()
}
override fun io(): Scheduler {
return Schedulers.io()
}
}
还有一个用于测试 类:
class TestSchedulerProvider : SchedulerProvider {
override fun ui(): Scheduler {
return Schedulers.trampoline()
}
override fun computation(): Scheduler {
return Schedulers.trampoline()
}
override fun trampoline(): Scheduler {
return Schedulers.trampoline()
}
override fun newThread(): Scheduler {
return Schedulers.trampoline()
}
override fun io(): Scheduler {
return Schedulers.trampoline()
}
}
你的代码在你调用 RxJava 的地方看起来像这样:
mCompositeDisposable.add(mDataManager.getQuote()
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
.subscribe(Consumer<Quote> {
...
您只需根据测试位置覆盖 SchedulerProvider
的实施。这是一个示例项目供参考,我正在链接将使用 SchedulerProvider
的可测试版本的测试文件:https://github.com/Obaied/DingerQuotes/blob/master/app/src/test/java/com/obaied/dingerquotes/QuotePresenterTest.kt#L31
这正是对我有用的语法:
RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline())
作为建议解决方案的替代方案,这在我的项目中已经运行了一段时间。 您可以在测试中使用它 classes 像这样:
@get:Rule
val immediateSchedulersRule = ImmediateSchedulersRule()
class 看起来像这样:
class ImmediateSchedulersRule : ExternalResource() {
val immediateScheduler: Scheduler = object : Scheduler() {
override fun createWorker() = ExecutorScheduler.ExecutorWorker(Executor { it.run() })
// This prevents errors when scheduling a delay
override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
return super.scheduleDirect(run, 0, unit)
}
}
override fun before() {
RxJavaPlugins.setIoSchedulerHandler { immediateScheduler }
RxJavaPlugins.setComputationSchedulerHandler { immediateScheduler }
RxJavaPlugins.setNewThreadSchedulerHandler { immediateScheduler }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediateScheduler }
RxAndroidPlugins.setMainThreadSchedulerHandler { immediateScheduler }
}
override fun after() {
RxJavaPlugins.reset()
}
}
您可以找到一种从 TestRule 迁移到 ExternalResource 的方法here and get more info on testing RxJava 2 here。