单元测试有延迟的 Rxjava observable

Unit testing Rxjava observables that have a delay

我希望能够对具有延迟发射但实际上无需等待延迟时间的 Observable 进行单元测试。有没有办法做到这一点?

我目前正在使用 CountDownHatch 来延迟断言,效果很好,但增加了测试 运行 时间。

示例:

val myObservable: PublishSubject<Boolean> = PublishSubject.create<Boolean>()
fun myObservable(): Observable<Boolean> = myObservable.delay(3, TimeUnit.SECONDS)

@Test
fun testMyObservable() {
    val testObservable = myObservable().test()
    myObservable.onNext(true)

    // val lock = CountDownLatch(1)
    // lock.await(3100, TimeUnit.MILLISECONDS)
    testObservable.assertValue(true)
}

TestScheduler is perfect for this. It has a handy method advanceTimeBy(long, TimeUnit) allows you to control the timing. And Observable.delay has an overload method 取一个 Scheduler

所以直接在myObservable函数中使用默认的Scheduler.computation(),单元测试使用TestScheduler即可。

感谢@aaron-he 的回答,我能够想出一个完整的解决方案。

它使用 TestScheduler 的组合来提前时间,还 RxJavaPlugins 使用 testScheduler 覆盖默认调度程序。这允许在不修改或需要传入 Scheduler.

的情况下测试 myObservable() 函数
@Before
fun setUp() = RxJavaPlugins.reset()

@After
fun tearDown() = RxJavaPlugins.reset()

@Test
fun testMyObservable() {
    val testScheduler = TestScheduler()
    RxJavaPlugins.setComputationSchedulerHandler { testScheduler }

    val testObservable = myObservable().test()
    myObservable.onNext(true)

    testScheduler.advanceTimeBy(2, TimeUnit.SECONDS)
    testObservable.assertEmpty()
    testScheduler.advanceTimeBy(2, TimeUnit.SECONDS)
    testObservable.assertValue(true)
}

发现PublishSubject的一些奇怪行为需要一些时间订阅,例如测试失败:

private val scheduler = TestScheduler()
    @Test
    fun publishSubjectFailedTest() {
        val callback: DelayCallback = mock()

        val myPublishSubject: PublishSubject<Boolean> = PublishSubject.create()
        myPublishSubject
            .delay(10, TimeUnit.SECONDS, scheduler)
            .subscribeOn(scheduler)
            .observeOn(scheduler)
            .subscribe(
                Consumer<Boolean> {
                    callback.onCalldown()
                },
                Consumer<Throwable> {

                },
                Action {

                }
            )
        myPublishSubject.onNext(true)
        scheduler.advanceTimeBy(20, TimeUnit.SECONDS)
        verify(callback, times(1)).onCalldown()
    }

但是如果我们在调用onNext之前加上一个scheduler的时间,比如scheduler.advanceTimeBy(1, TimeUnit.NANOSECONDS) 那么测试就会成功:

 @Test
    fun publishSubjectSuccessTest() {
        val callback: DelayCallback = mock()

        val myPublishSubject: PublishSubject<Boolean> = PublishSubject.create()
        myPublishSubject
            .delay(10, TimeUnit.SECONDS, scheduler)
            .subscribeOn(scheduler)
            .observeOn(scheduler)
            .subscribe(
                Consumer<Boolean> {
                    callback.onCalldown()
                },
                Consumer<Throwable> {

                },
                Action {

                }
            )
        scheduler.advanceTimeBy(1, TimeUnit.NANOSECONDS)//added time of scheduler
        myPublishSubject.onNext(true)
        scheduler.advanceTimeBy(20, TimeUnit.SECONDS)
        verify(callback, times(1)).onCalldown()
    }