RxJava2 PublishSubject 订阅者在使用 SingleScheduler 从多个线程调用时无法接收项目

RxJava2 PublishSubject subscriber fails to receive items when called from multiple threads using SingleScheduler

我有以下单元测试,我尝试从不同的线程发送 10 个 Strings 并测试我从单个线程接收到那些 Strings。我的问题是这个测试失败了。有时它会成功,但有时我只收到 89 项,之后测试挂起,直到闩锁超时。我是否以错误的方式使用了 SingleScheduler?我还漏掉了什么吗?

val consumerCallerThreadNames = mutableSetOf<String>()
val messageCount = AtomicInteger(0)

val latch = CountDownLatch(MESSAGE_COUNT)

@Test
fun someTest() {
    val msg = "foo"

    val subject = PublishSubject.create<String>()
    subject
            .observeOn(SingleScheduler())
            .subscribe({ message ->
                consumerCallerThreadNames.add(Thread.currentThread().name)
                messageCount.incrementAndGet()
                latch.countDown()
            }, Throwable::printStackTrace)

    1.rangeTo(MESSAGE_COUNT).forEach {
        Thread({
            try {
                subject.onNext(msg)
            } catch (t: Throwable) {
                t.printStackTrace()
            }
        }).start()
    }
    latch.await(10, SECONDS)

    assertThat(consumerCallerThreadNames).hasSize(1)
    assertThat(messageCount.get()).isEqualTo(MESSAGE_COUNT)
}

companion object {
    val MESSAGE_COUNT = 10
}

如果我将其重写为使用单线程 ExecutorService 测试将不再失败,所以问题要么出在 Rx 上,要么是我对 Rx 缺乏了解。

RxJava 要求不能同时调用 on*。这意味着您的代码不是线程安全的。

由于只有主题本身以并发方式使用,因此应该可以通过使用 Subject<T>.toSerialized() 方法序列化(本质上是 Java 的 "synchronized")主题本身来修复。

val subject = PublishSubject.create<String>() 变为 val subject = PublishSubject.create<String>().toSerialized().