为什么 rxjava2 共享运算符不多播?
Why is rxjava2 share operator not multicasting?
当我的所有订阅者在进入下一个发射之前收到相同的发射时,就会发生多播。但是当我使用共享命令时,我没有看到多播。我有一个昂贵的手术,我只想做一次。让我们看看这段代码:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}
fun doMultiplyBy2(){
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
ob1.share()
}
}
这里是实际输出:
expensive operation
1st subscriber: 2;
expensive operation
1st subscriber: 4;
expensive operation
1st subscriber: 6;
expensive operation
1st subscriber: 8;
expensive operation
1st subscriber: 10;
expensive operation
2nd subscriber: 2;
expensive operation
2nd subscriber: 4;
expensive operation
2nd subscriber: 6;
expensive operation
2nd subscriber: 8;
expensive operation
2nd subscriber: 10;
但为什么它在发送给所有订阅者之前重复昂贵的操作。它为每个订户重复昂贵的操作?我正在使用共享,所以我希望输出是这样的:
expensive operation
1st subscriber: 2;
2nd subscriber: 2;
expensive operation
1st subscriber: 4;
2nd subscriber: 4;
expensive operation
1st subscriber: 6;
2nd subscriber: 6;
expensive operation
1st subscriber: 8;
2nd subscriber: 8;
expensive operation
1st subscriber: 10;
2nd subscriber: 10;
还有什么有趣的是,我发现仅当我执行以下操作时才会出现预期的输出:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.publish()
fun doMultiplyBy2(){
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
ob1.connect()
}
}
从而使其成为可连接的可观察对象,然后手动连接。为什么共享不起作用?
更新:我想说清楚问题是什么:
share 应该与 publish().refCount() 相同,我还认为 share 会为我多播,但我没有看到它这样做。让我们看看不使用共享,而是使用手动发布和连接:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.publish()
fun doMultiplyBy2(){
//ob1 = ob1.share()
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
ob1.connect()
}
}
这个输出是:
expensive operation
1st subscriber: 2;
2nd subscriber: 2;
expensive operation
1st subscriber: 4;
2nd subscriber: 4;
expensive operation
1st subscriber: 6;
2nd subscriber: 6;
expensive operation
1st subscriber: 8;
2nd subscriber: 8;
expensive operation
1st subscriber: 10;
2nd subscriber: 10;
这正是我想要的。每次发射完成一次昂贵的操作。
不要将其更改为使用共享:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.publish().refCount()//or can use share()
fun doMultiplyBy2(){
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
}
}
产生以下输出:
expensive operation
1st subscriber: 2;
expensive operation
1st subscriber: 4;
expensive operation
1st subscriber: 6;
expensive operation
1st subscriber: 8;
expensive operation
1st subscriber: 10;
expensive operation
2nd subscriber: 2;
expensive operation
2nd subscriber: 4;
expensive operation
2nd subscriber: 6;
expensive operation
2nd subscriber: 8;
expensive operation
2nd subscriber: 10;
如果 publish().refCount() 不像普通的可观察对象那样进行多播,那么 publish().refCount() 的目的是什么。它或分享的意义何在??
如您所知,share
运算符与 publish().refCount()
相同。如您所知,Refcount
使 connectable observer
。所以你的代码是正确的。但是你有一个遗漏的东西是 Thread
。我想你能理解我想解释的内容。如果不告诉我!
像这样更改代码
val ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.subscribeOn(Schedulers.computation()).share()
// Add subscribeOn operator to change emitting thread from MAIN to WORK
fun doMultiplyBy2() {
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
}
doMultiplyBy2()
Thread.sleep(1000) // Waiting for ending to execute
产出
expensive operation
1st subscriber: 2;
2nd subscriber: 2;
expensive operation
1st subscriber: 4;
2nd subscriber: 4;
expensive operation
1st subscriber: 6;
2nd subscriber: 6;
expensive operation
1st subscriber: 8;
2nd subscriber: 8;
expensive operation
1st subscriber: 10;
2nd subscriber: 10;
refCount()
等同于refCount(1)
,意思是当有一个订阅者订阅时,observable就会开始执行。
所以在下面的例子中:
var ob1 = Observable.fromArray(1, 2, 3, 4, 5).map {
println("expensive operation")
it * 2
}
.publish().refCount(1) // or .share() or .publish().refCount()
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }
.subscribe { println(it) }
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }
.subscribe { println(it) }
事情是这样的:
Observable has 0 subscribers
Subscriber 1 subscribes
Observable has 1 subscriber; refCount(1) condition is satisfied
Observable EXECUTES EXPENSIVE OPERATION
Subscriber 1 executes onNext()
Subscriber 1 executes onComplete() and unsubscribe
Observable has 0 subscribers
Subscriber 2 subscribes
Observable has 1 subscriber; refCount(1) condition is satisfied
Observable EXECUTES EXPENSIVE OPERATION
Subscriber 2 executes onNext()
Subscriber 2 executes onComplete() and unsubscribe
Observable has 0 subscribers
如果你想让 Observable 等到有 2 个订阅者,你必须递增到 refCount(2)
。
所以在下面的例子中:
var ob1 = Observable.fromArray(1, 2, 3, 4, 5).map {
println("expensive operation")
it * 2
}
.publish().refCount(2) // <<<<< Increment to 2
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }
.subscribe { println(it) }
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }
.subscribe { println(it) }
事情是这样的:
Observable has 0 subscribers
Subscriber 1 subscribes
Observable has 1 subscriber
Subscriber 2 subscribes
Observable has 2 subscribers; refCount(2) condition is satisfied
Observable EXECUTES EXPENSIVE OPERATION
Subscriber 1 executes onNext()
Subscriber 2 executes onNext()
Subscriber 1 executes onComplete() and unsubscribe
Observable has 1 subscriber
Subscriber 2 executes onComplete() and unsubscribe
Observable has 0 subscribers
注意这一次,昂贵的操作只执行一次。概括地说,如果您需要将昂贵的操作多播到 N 个下游可观察对象,请调用 refCount(N)
当我的所有订阅者在进入下一个发射之前收到相同的发射时,就会发生多播。但是当我使用共享命令时,我没有看到多播。我有一个昂贵的手术,我只想做一次。让我们看看这段代码:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}
fun doMultiplyBy2(){
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
ob1.share()
}
}
这里是实际输出:
expensive operation
1st subscriber: 2;
expensive operation
1st subscriber: 4;
expensive operation
1st subscriber: 6;
expensive operation
1st subscriber: 8;
expensive operation
1st subscriber: 10;
expensive operation
2nd subscriber: 2;
expensive operation
2nd subscriber: 4;
expensive operation
2nd subscriber: 6;
expensive operation
2nd subscriber: 8;
expensive operation
2nd subscriber: 10;
但为什么它在发送给所有订阅者之前重复昂贵的操作。它为每个订户重复昂贵的操作?我正在使用共享,所以我希望输出是这样的:
expensive operation
1st subscriber: 2;
2nd subscriber: 2;
expensive operation
1st subscriber: 4;
2nd subscriber: 4;
expensive operation
1st subscriber: 6;
2nd subscriber: 6;
expensive operation
1st subscriber: 8;
2nd subscriber: 8;
expensive operation
1st subscriber: 10;
2nd subscriber: 10;
还有什么有趣的是,我发现仅当我执行以下操作时才会出现预期的输出:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.publish()
fun doMultiplyBy2(){
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
ob1.connect()
}
}
从而使其成为可连接的可观察对象,然后手动连接。为什么共享不起作用?
更新:我想说清楚问题是什么:
share 应该与 publish().refCount() 相同,我还认为 share 会为我多播,但我没有看到它这样做。让我们看看不使用共享,而是使用手动发布和连接:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.publish()
fun doMultiplyBy2(){
//ob1 = ob1.share()
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
ob1.connect()
}
}
这个输出是:
expensive operation
1st subscriber: 2;
2nd subscriber: 2;
expensive operation
1st subscriber: 4;
2nd subscriber: 4;
expensive operation
1st subscriber: 6;
2nd subscriber: 6;
expensive operation
1st subscriber: 8;
2nd subscriber: 8;
expensive operation
1st subscriber: 10;
2nd subscriber: 10;
这正是我想要的。每次发射完成一次昂贵的操作。
不要将其更改为使用共享:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.publish().refCount()//or can use share()
fun doMultiplyBy2(){
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
}
}
产生以下输出:
expensive operation
1st subscriber: 2;
expensive operation
1st subscriber: 4;
expensive operation
1st subscriber: 6;
expensive operation
1st subscriber: 8;
expensive operation
1st subscriber: 10;
expensive operation
2nd subscriber: 2;
expensive operation
2nd subscriber: 4;
expensive operation
2nd subscriber: 6;
expensive operation
2nd subscriber: 8;
expensive operation
2nd subscriber: 10;
如果 publish().refCount() 不像普通的可观察对象那样进行多播,那么 publish().refCount() 的目的是什么。它或分享的意义何在??
如您所知,share
运算符与 publish().refCount()
相同。如您所知,Refcount
使 connectable observer
。所以你的代码是正确的。但是你有一个遗漏的东西是 Thread
。我想你能理解我想解释的内容。如果不告诉我!
像这样更改代码
val ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.subscribeOn(Schedulers.computation()).share()
// Add subscribeOn operator to change emitting thread from MAIN to WORK
fun doMultiplyBy2() {
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
}
doMultiplyBy2()
Thread.sleep(1000) // Waiting for ending to execute
产出
expensive operation
1st subscriber: 2;
2nd subscriber: 2;
expensive operation
1st subscriber: 4;
2nd subscriber: 4;
expensive operation
1st subscriber: 6;
2nd subscriber: 6;
expensive operation
1st subscriber: 8;
2nd subscriber: 8;
expensive operation
1st subscriber: 10;
2nd subscriber: 10;
refCount()
等同于refCount(1)
,意思是当有一个订阅者订阅时,observable就会开始执行。
所以在下面的例子中:
var ob1 = Observable.fromArray(1, 2, 3, 4, 5).map {
println("expensive operation")
it * 2
}
.publish().refCount(1) // or .share() or .publish().refCount()
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }
.subscribe { println(it) }
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }
.subscribe { println(it) }
事情是这样的:
Observable has 0 subscribers
Subscriber 1 subscribes
Observable has 1 subscriber; refCount(1) condition is satisfied
Observable EXECUTES EXPENSIVE OPERATION
Subscriber 1 executes onNext()
Subscriber 1 executes onComplete() and unsubscribe
Observable has 0 subscribers
Subscriber 2 subscribes
Observable has 1 subscriber; refCount(1) condition is satisfied
Observable EXECUTES EXPENSIVE OPERATION
Subscriber 2 executes onNext()
Subscriber 2 executes onComplete() and unsubscribe
Observable has 0 subscribers
如果你想让 Observable 等到有 2 个订阅者,你必须递增到 refCount(2)
。
所以在下面的例子中:
var ob1 = Observable.fromArray(1, 2, 3, 4, 5).map {
println("expensive operation")
it * 2
}
.publish().refCount(2) // <<<<< Increment to 2
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }
.subscribe { println(it) }
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }
.subscribe { println(it) }
事情是这样的:
Observable has 0 subscribers
Subscriber 1 subscribes
Observable has 1 subscriber
Subscriber 2 subscribes
Observable has 2 subscribers; refCount(2) condition is satisfied
Observable EXECUTES EXPENSIVE OPERATION
Subscriber 1 executes onNext()
Subscriber 2 executes onNext()
Subscriber 1 executes onComplete() and unsubscribe
Observable has 1 subscriber
Subscriber 2 executes onComplete() and unsubscribe
Observable has 0 subscribers
注意这一次,昂贵的操作只执行一次。概括地说,如果您需要将昂贵的操作多播到 N 个下游可观察对象,请调用 refCount(N)