RxJava2 PublishSubject 订阅者在使用 SingleScheduler 从多个线程调用时无法接收项目
RxJava2 PublishSubject subscriber fails to receive items when called from multiple threads using SingleScheduler
我有以下单元测试,我尝试从不同的线程发送 10 个 String
s 并测试我从单个线程接收到那些 String
s。我的问题是这个测试失败了。有时它会成功,但有时我只收到 8
或 9
项,之后测试挂起,直到闩锁超时。我是否以错误的方式使用了 SingleScheduler
?我还漏掉了什么吗?
val consumerCallerThreadNames = mutableSetOf<String>()
val messageCount = AtomicInteger(0)
val latch = CountDownLatch(MESSAGE_COUNT)
@Test
fun someTest() {
val msg = "foo"
val subject = PublishSubject.create<String>()
subject
.observeOn(SingleScheduler())
.subscribe({ message ->
consumerCallerThreadNames.add(Thread.currentThread().name)
messageCount.incrementAndGet()
latch.countDown()
}, Throwable::printStackTrace)
1.rangeTo(MESSAGE_COUNT).forEach {
Thread({
try {
subject.onNext(msg)
} catch (t: Throwable) {
t.printStackTrace()
}
}).start()
}
latch.await(10, SECONDS)
assertThat(consumerCallerThreadNames).hasSize(1)
assertThat(messageCount.get()).isEqualTo(MESSAGE_COUNT)
}
companion object {
val MESSAGE_COUNT = 10
}
如果我将其重写为使用单线程 ExecutorService
测试将不再失败,所以问题要么出在 Rx 上,要么是我对 Rx 缺乏了解。
RxJava 要求不能同时调用 on*
。这意味着您的代码不是线程安全的。
由于只有主题本身以并发方式使用,因此应该可以通过使用 Subject<T>.toSerialized()
方法序列化(本质上是 Java 的 "synchronized")主题本身来修复。
val subject = PublishSubject.create<String>()
变为 val subject = PublishSubject.create<String>().toSerialized()
.
我有以下单元测试,我尝试从不同的线程发送 10 个 String
s 并测试我从单个线程接收到那些 String
s。我的问题是这个测试失败了。有时它会成功,但有时我只收到 8
或 9
项,之后测试挂起,直到闩锁超时。我是否以错误的方式使用了 SingleScheduler
?我还漏掉了什么吗?
val consumerCallerThreadNames = mutableSetOf<String>()
val messageCount = AtomicInteger(0)
val latch = CountDownLatch(MESSAGE_COUNT)
@Test
fun someTest() {
val msg = "foo"
val subject = PublishSubject.create<String>()
subject
.observeOn(SingleScheduler())
.subscribe({ message ->
consumerCallerThreadNames.add(Thread.currentThread().name)
messageCount.incrementAndGet()
latch.countDown()
}, Throwable::printStackTrace)
1.rangeTo(MESSAGE_COUNT).forEach {
Thread({
try {
subject.onNext(msg)
} catch (t: Throwable) {
t.printStackTrace()
}
}).start()
}
latch.await(10, SECONDS)
assertThat(consumerCallerThreadNames).hasSize(1)
assertThat(messageCount.get()).isEqualTo(MESSAGE_COUNT)
}
companion object {
val MESSAGE_COUNT = 10
}
如果我将其重写为使用单线程 ExecutorService
测试将不再失败,所以问题要么出在 Rx 上,要么是我对 Rx 缺乏了解。
RxJava 要求不能同时调用 on*
。这意味着您的代码不是线程安全的。
由于只有主题本身以并发方式使用,因此应该可以通过使用 Subject<T>.toSerialized()
方法序列化(本质上是 Java 的 "synchronized")主题本身来修复。
val subject = PublishSubject.create<String>()
变为 val subject = PublishSubject.create<String>().toSerialized()
.