RxJava2 PublishSubject 的意外行为
Unexpected behavior with RxJava2 PublishSubject
我有以下使用 PublishSubject
.
的代码
val subject = PublishSubject.create<Int>()
val o1: Observable<String> =
subject.observeOn(Schedulers.newThread()).map { i: Int ->
println("${Thread.currentThread()} | ${Date()} | map => $i")
i.toString()
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 1")
subject.onNext(1)
1) 我从中创建一个 Observable
并映射它(出于本示例的目的,我只是转换为 String
)=> o1
.
2) 然后我订阅了o1
3次。
3) 最后我通过调用 subject.onNext(1)
.
"publish" 一个事件
令我惊讶的是,我得到了以下输出:
Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1
map
最终被调用了 3 次,我不明白为什么,因为我订阅了 o1
,这应该在 map
发生之后发生。我肯定错过了什么。任何帮助将不胜感激。
谢谢
颜
来自评论:
您订阅了 o1
三次,每次都创建一个独立的序列,直到 PublishSubject
会将 onNext
分派给所有 3 个链。
从所有 3 个订阅者的角度来看,PublishSubject
是一个多播源,它通过 subscribe()
调用建立的独立链向他们发送事件信号。
在 Subject
上应用运算符通常不会使整个链变热,因为这些运算符元素只有在订阅时才会附加到源 Subject
。因此,多个订阅将产生到相同上游的多个频道 Subject
.
使用 publish
得到一个 ConnectableObservable
(或最后的另一个 PublishSubject
)使序列从那一点开始变得热。
我有以下使用 PublishSubject
.
val subject = PublishSubject.create<Int>()
val o1: Observable<String> =
subject.observeOn(Schedulers.newThread()).map { i: Int ->
println("${Thread.currentThread()} | ${Date()} | map => $i")
i.toString()
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (1) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (2) => $it")
}
o1.subscribe {
println("${Thread.currentThread()} | ${Date()} | direct subscription (3) => $it")
}
println("${Thread.currentThread()} | ${Date()} | submitting 1")
subject.onNext(1)
1) 我从中创建一个 Observable
并映射它(出于本示例的目的,我只是转换为 String
)=> o1
.
2) 然后我订阅了o1
3次。
3) 最后我通过调用 subject.onNext(1)
.
令我惊讶的是,我得到了以下输出:
Thread[main,5,main] | Mon Jun 19 09:46:37 PDT 2017 | submitting 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | map => 1
Thread[RxNewThreadScheduler-1,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (1) => 1
Thread[RxNewThreadScheduler-2,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (2) => 1
Thread[RxNewThreadScheduler-3,5,main] | Mon Jun 19 09:46:37 PDT 2017 | direct subscription (3) => 1
map
最终被调用了 3 次,我不明白为什么,因为我订阅了 o1
,这应该在 map
发生之后发生。我肯定错过了什么。任何帮助将不胜感激。
谢谢 颜
来自评论:
您订阅了 o1
三次,每次都创建一个独立的序列,直到 PublishSubject
会将 onNext
分派给所有 3 个链。
从所有 3 个订阅者的角度来看,PublishSubject
是一个多播源,它通过 subscribe()
调用建立的独立链向他们发送事件信号。
在 Subject
上应用运算符通常不会使整个链变热,因为这些运算符元素只有在订阅时才会附加到源 Subject
。因此,多个订阅将产生到相同上游的多个频道 Subject
.
使用 publish
得到一个 ConnectableObservable
(或最后的另一个 PublishSubject
)使序列从那一点开始变得热。