为什么 rxjava2 共享运算符不多播?

Why is rxjava2 share operator not multicasting?

当我的所有订阅者在进入下一个发射之前收到相同的发射时,就会发生多播。但是当我使用共享命令时,我没有看到多播。我有一个昂贵的手术,我只想做一次。让我们看看这段代码:

var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }

    fun doMultiplyBy2(){
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}

        ob1.share()
    }
}

这里是实际输出:

    expensive operation
 1st subscriber: 2;
expensive operation
 1st subscriber: 4;
expensive operation
 1st subscriber: 6;
expensive operation
 1st subscriber: 8;
expensive operation
 1st subscriber: 10;
expensive operation
 2nd subscriber: 2;
expensive operation
 2nd subscriber: 4;
expensive operation
 2nd subscriber: 6;
expensive operation
 2nd subscriber: 8;
expensive operation
 2nd subscriber: 10;

但为什么它在发送给所有订阅者之前重复昂贵的操作。它为每个订户重复昂贵的操作?我正在使用共享,所以我希望输出是这样的:

    expensive operation
 1st subscriber: 2;
 2nd subscriber: 2;
expensive operation
 1st subscriber: 4;
 2nd subscriber: 4;
expensive operation
 1st subscriber: 6;
 2nd subscriber: 6;
expensive operation
 1st subscriber: 8;
 2nd subscriber: 8;
expensive operation
 1st subscriber: 10;
 2nd subscriber: 10;

还有什么有趣的是,我发现仅当我执行以下操作时才会出现预期的输出:

var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }.publish()

    fun doMultiplyBy2(){
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}

        ob1.connect()
    }
}

从而使其成为可连接的可观察对象,然后手动连接。为什么共享不起作用?

更新:我想说清楚问题是什么:

share 应该与 publish().refCount() 相同,我还认为 share 会为我多播,但我没有看到它这样做。让我们看看不使用共享,而是使用手动发布和连接:

 var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }.publish()

    fun doMultiplyBy2(){
        //ob1 = ob1.share()
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}

        ob1.connect()
    }
}

这个输出是:

expensive operation
 1st subscriber: 2;
 2nd subscriber: 2;
expensive operation
 1st subscriber: 4;
 2nd subscriber: 4;
expensive operation
 1st subscriber: 6;
 2nd subscriber: 6;
expensive operation
 1st subscriber: 8;
 2nd subscriber: 8;
expensive operation
 1st subscriber: 10;
 2nd subscriber: 10;

这正是我想要的。每次发射完成一次昂贵的操作。

不要将其更改为使用共享:

var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }.publish().refCount()//or can use share()

    fun doMultiplyBy2(){
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
    }
}

产生以下输出:

    expensive operation
 1st subscriber: 2;
expensive operation
 1st subscriber: 4;
expensive operation
 1st subscriber: 6;
expensive operation
 1st subscriber: 8;
expensive operation
 1st subscriber: 10;
expensive operation
 2nd subscriber: 2;
expensive operation
 2nd subscriber: 4;
expensive operation
 2nd subscriber: 6;
expensive operation
 2nd subscriber: 8;
expensive operation
 2nd subscriber: 10;

如果 publish().refCount() 不像普通的可观察对象那样进行多播,那么 publish().refCount() 的目的是什么。它或分享的意义何在??

如您所知,share 运算符与 publish().refCount() 相同。如您所知,Refcount 使 connectable observer。所以你的代码是正确的。但是你有一个遗漏的东西是 Thread。我想你能理解我想解释的内容。如果不告诉我!

像这样更改代码

val ob1 = Observable.fromArray(1,2,3,4,5).map {
    println("expensive operation")
    it * 2
}.subscribeOn(Schedulers.computation()).share() 
// Add subscribeOn operator to change emitting thread from MAIN to WORK

fun doMultiplyBy2() {
    ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

    ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
}

doMultiplyBy2()

Thread.sleep(1000) // Waiting for ending to execute

产出

expensive operation
 1st subscriber: 2;
 2nd subscriber: 2;
expensive operation
 1st subscriber: 4;
 2nd subscriber: 4;
expensive operation
 1st subscriber: 6;
 2nd subscriber: 6;
expensive operation
 1st subscriber: 8;
 2nd subscriber: 8;
expensive operation
 1st subscriber: 10;
 2nd subscriber: 10;

refCount()等同于refCount(1),意思是当有一个订阅者订阅时,observable就会开始执行。

所以在下面的例子中:

var ob1 = Observable.fromArray(1, 2, 3, 4, 5).map {
  println("expensive operation")
  it * 2
}
    .publish().refCount(1) // or .share() or .publish().refCount()

ob1.flatMap { Observable.just(" 1st subscriber: $it;") }
   .subscribe { println(it) }
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }
   .subscribe { println(it) }

事情是这样的:

Observable has 0 subscribers  
Subscriber 1 subscribes  
Observable has 1 subscriber; refCount(1) condition is satisfied  
Observable EXECUTES EXPENSIVE OPERATION
Subscriber 1 executes onNext()  
Subscriber 1 executes onComplete() and unsubscribe  
Observable has 0 subscribers  
Subscriber 2 subscribes  
Observable has 1 subscriber; refCount(1) condition is satisfied  
Observable EXECUTES EXPENSIVE OPERATION
Subscriber 2 executes onNext()  
Subscriber 2 executes onComplete() and unsubscribe  
Observable has 0 subscribers  

如果你想让 Observable 等到有 2 个订阅者,你必须递增到 refCount(2)

所以在下面的例子中:

var ob1 = Observable.fromArray(1, 2, 3, 4, 5).map {
  println("expensive operation")
  it * 2
}
    .publish().refCount(2) // <<<<< Increment to 2

ob1.flatMap { Observable.just(" 1st subscriber: $it;") }
   .subscribe { println(it) }
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }
   .subscribe { println(it) }

事情是这样的:

Observable has 0 subscribers
Subscriber 1 subscribes
Observable has 1 subscriber
Subscriber 2 subscribes
Observable has 2 subscribers; refCount(2) condition is satisfied
Observable EXECUTES EXPENSIVE OPERATION
Subscriber 1 executes onNext()
Subscriber 2 executes onNext()
Subscriber 1 executes onComplete() and unsubscribe
Observable has 1 subscriber
Subscriber 2 executes onComplete() and unsubscribe
Observable has 0 subscribers

注意这一次,昂贵的操作只执行一次。概括地说,如果您需要将昂贵的操作多播到 N 个下游可观察对象,请调用 refCount(N)