订阅主题时奇怪的 TestObserver 行为

Odd TestObserver behaviour when subscribing to a Subject

考虑以下 RxJava 2 Kotlin 中的片段:

// 1. Create subject
val subject = PublishSubject.create<Int>()

// 2. Get observable
val observable = subject.subscribeOn(Schedulers.io())

// 3. Subscribe
val observer = observable.test()

// 4. Trigger next
subject.onNext(42)

// 5. Await
observer.awaitCount(1)

// 6. Assert value
observer.assertValue(42)

根据我的理解,observer 应该能够在等待语句 5 和语句 6 的断言后接收到 42 应该会成功。

然而,实际发生的是:5 阻塞直到超时,因为没有收到任何值并且 6 上的断言失败。

此外,如果我在 3 上设置断点并在暂停后继续执行,一切都会正常。看起来像是线程问题。

我显然在这里遗漏了一些东西。使用热可观察对象的正确方法是什么?

通过取消 subscribeOn,您将对 Subject 的实际订阅放在另一个线程上,这可能需要更长的时间,以便 subject.onNext(42) 仍然可以找到未订阅的 Subject.

除了在 PublishSubject 上使用 subscribeOn 没有实际用途之外,您可以通过循环等待订阅:

while (!subject.hasObservers()) { Thread.sleep(1); }