RxJava 2 在单元测试中覆盖 IO 调度程序

RxJava 2 overriding IO scheduler in unit test

我正在尝试测试以下 RxKotlin/RxJava 2 代码:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }

我正在尝试按如下方式覆盖调度程序:

// Runs before each test suite
RxJavaPlugins.setInitIoSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { Schedulers.trampoline() }

但是,运行测试时出现以下错误:

java.lang.ExceptionInInitializerError
...
Caused by: java.lang.NullPointerException: Scheduler Callable result can't be null
    at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
    at io.reactivex.plugins.RxJavaPlugins.applyRequireNonNull(RxJavaPlugins.java:1317)
    at io.reactivex.plugins.RxJavaPlugins.initIoScheduler(RxJavaPlugins.java:306)
    at io.reactivex.schedulers.Schedulers.<clinit>(Schedulers.java:84)

有人遇到过这个问题吗?


测试在使用 RxKotlin/RxJava 1 和以下调度程序覆盖时运行良好:

RxAndroidPlugins.getInstance().registerSchedulersHook(object : RxAndroidSchedulersHook() {
    override fun getMainThreadScheduler() = Schedulers.immediate()
})

RxJavaPlugins.getInstance().registerSchedulersHook(object : RxJavaSchedulersHook() {
    override fun getIOScheduler() = Schedulers.immediate()
})

想通了!它与以下代码中的事实有关:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }

validate(data) 正在 returning 一个 Observable,它发出以下内容:emitter.onNext(null)。由于 RxJava 2 不再接受 null 值,因此 flatMap 没有被调用。我将 validate 更改为 return a Completable 并将调度程序覆盖更新为以下内容:

RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }

现在测试通过了!

我建议您采用不同的方法并为您的调度程序添加一个抽象层。这家伙有一个很好的article

在 Kotlin 中看起来像这样

interface SchedulerProvider {
    fun ui(): Scheduler
    fun computation(): Scheduler
    fun trampoline(): Scheduler
    fun newThread(): Scheduler
    fun io(): Scheduler 
}

然后用自己的 SchedulerProvider 实现覆盖它:

class AppSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return AndroidSchedulers.mainThread()
    }

    override fun computation(): Scheduler {
        return Schedulers.computation()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.newThread()
    }

    override fun io(): Scheduler {
        return Schedulers.io()
    }
}

还有一个用于测试 类:

class TestSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun computation(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun io(): Scheduler {
        return Schedulers.trampoline()
    }
}

你的代码在你调用 RxJava 的地方看起来像这样:

mCompositeDisposable.add(mDataManager.getQuote()
        .subscribeOn(mSchedulerProvider.io())
        .observeOn(mSchedulerProvider.ui())
        .subscribe(Consumer<Quote> {
...

您只需根据测试位置覆盖 SchedulerProvider 的实施。这是一个示例项目供参考,我正在链接将使用 SchedulerProvider 的可测试版本的测试文件:https://github.com/Obaied/DingerQuotes/blob/master/app/src/test/java/com/obaied/dingerquotes/QuotePresenterTest.kt#L31

这正是对我有用的语法:

RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline())

作为建议解决方案的替代方案,这在我的项目中已经运行了一段时间。 您可以在测试中使用它 classes 像这样:

@get:Rule
val immediateSchedulersRule = ImmediateSchedulersRule()

class 看起来像这样:

class ImmediateSchedulersRule : ExternalResource() {

    val immediateScheduler: Scheduler = object : Scheduler() {

        override fun createWorker() = ExecutorScheduler.ExecutorWorker(Executor { it.run() })

        // This prevents errors when scheduling a delay
        override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
            return super.scheduleDirect(run, 0, unit)
        }

    }

    override fun before() {
        RxJavaPlugins.setIoSchedulerHandler { immediateScheduler }
        RxJavaPlugins.setComputationSchedulerHandler { immediateScheduler }
        RxJavaPlugins.setNewThreadSchedulerHandler { immediateScheduler }

        RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediateScheduler }
        RxAndroidPlugins.setMainThreadSchedulerHandler { immediateScheduler }
    }

    override fun after() {
        RxJavaPlugins.reset()
    }

}

您可以找到一种从 TestRule 迁移到 ExternalResource 的方法here and get more info on testing RxJava 2 here