Rx-Kotlin awaitTerminalEvent 永远不会 onComplete

Rx-Kotlin awaitTerminalEvent never getting onComplete

我试图更好地了解如何使用 Rx-Kotlin 进行单元测试,但我未能成功将主题设置为 "completed"。结果,我总是在等待 5 秒的超时(onComplete 应该是立即的),然后在 assertComplete 上失败。

我对 awaitTerminalEvent is that it should only block until the onComplete is called. I have also looked into TestScheduler, 的理解,但我认为这里不需要。

任何可以引导我朝着正确方向前进的帮助或文档将不胜感激。

@Test
fun testObservable() {
    val subject = BehaviorSubject.create<Int>()
    subject.onNext(0)

    TestSubscriber<Int>().apply {
        subject.subscribe({
            System.out.println(it)
            subject.onNext(1)
            subject.onComplete()
        })

        this.awaitTerminalEvent(5, TimeUnit.SECONDS)
        this.assertComplete()
        this.assertValue(1)
    }
}

您以错误的方式使用了错误的工具...

  • TestSubscriber用于测试Flowable,你应该在这里使用TestObserver
  • 您应该订阅 TestObserver(或 Flowable 中的 TestSubscriber),以便它监控排放并能够等待终端事件和断言值。在您的代码中, TestSubscriber 未附加到任何流,因此它永远不会获得任何事件。

试图模仿您的代码,它可能是这样的:

 @Test
fun testObservable() {
    val subject = BehaviorSubject.create<Int>()
    subject.onNext(0)

    TestObserver<Int>().apply {
        subject.doOnNext {
            System.out.println(it)
            subject.onNext(1)
            subject.onComplete()
        }
                .subscribe(this)

        this.awaitTerminalEvent(5, TimeUnit.SECONDS)
        this.assertComplete()
        this.assertValue(1)
    }
}  

如您所见,我正在使用 TestObserver 订阅是通过 TestObserver 对象完成的,主题 onNext()onComplete() 已移至 doOnNext()。测试将失败,因为您有两个发射值,而测试仅断言单个“1”值。

一般来说有点不对,你在onNext()里面用subject再emit,然后调用onComplete(),你可以先subscribe再emit到外面。像这样:

TestObserver<Int>().apply {
        subject.subscribe(this)
        subject.onNext(1)
        subject.onComplete()
        ....
}