订阅主题时奇怪的 TestObserver 行为
Odd TestObserver behaviour when subscribing to a Subject
考虑以下 RxJava 2 Kotlin 中的片段:
// 1. Create subject
val subject = PublishSubject.create<Int>()
// 2. Get observable
val observable = subject.subscribeOn(Schedulers.io())
// 3. Subscribe
val observer = observable.test()
// 4. Trigger next
subject.onNext(42)
// 5. Await
observer.awaitCount(1)
// 6. Assert value
observer.assertValue(42)
根据我的理解,observer
应该能够在等待语句 5 和语句 6 的断言后接收到 42
应该会成功。
然而,实际发生的是:5 阻塞直到超时,因为没有收到任何值并且 6 上的断言失败。
此外,如果我在 3 上设置断点并在暂停后继续执行,一切都会正常。看起来像是线程问题。
我显然在这里遗漏了一些东西。使用热可观察对象的正确方法是什么?
通过取消 subscribeOn
,您将对 Subject
的实际订阅放在另一个线程上,这可能需要更长的时间,以便 subject.onNext(42)
仍然可以找到未订阅的 Subject
.
除了在 PublishSubject
上使用 subscribeOn
没有实际用途之外,您可以通过循环等待订阅:
while (!subject.hasObservers()) { Thread.sleep(1); }
考虑以下 RxJava 2 Kotlin 中的片段:
// 1. Create subject
val subject = PublishSubject.create<Int>()
// 2. Get observable
val observable = subject.subscribeOn(Schedulers.io())
// 3. Subscribe
val observer = observable.test()
// 4. Trigger next
subject.onNext(42)
// 5. Await
observer.awaitCount(1)
// 6. Assert value
observer.assertValue(42)
根据我的理解,observer
应该能够在等待语句 5 和语句 6 的断言后接收到 42
应该会成功。
然而,实际发生的是:5 阻塞直到超时,因为没有收到任何值并且 6 上的断言失败。
此外,如果我在 3 上设置断点并在暂停后继续执行,一切都会正常。看起来像是线程问题。
我显然在这里遗漏了一些东西。使用热可观察对象的正确方法是什么?
通过取消 subscribeOn
,您将对 Subject
的实际订阅放在另一个线程上,这可能需要更长的时间,以便 subject.onNext(42)
仍然可以找到未订阅的 Subject
.
除了在 PublishSubject
上使用 subscribeOn
没有实际用途之外,您可以通过循环等待订阅:
while (!subject.hasObservers()) { Thread.sleep(1); }