为什么结果不共享?

Why result does not get shared?

我有以下代码片段:

const source$ = Rx.Observable.from([1,2,3,4])
  .filter(x => x % 2)
  .map(x => x * x)
  .share();

source$.subscribe(x => console.log(`Stream 1 ${x}`));
source$.subscribe(x => console.log(`Stream 2 ${x}`));

作为我得到的结果

但我排除了以下共享结果:

"Stream 1 1"
"Stream 2 1"
"Stream 1 9"
"Stream 2 9"

为什么结果没有被分享?

这是因为您使用的是冷 Observable (http://reactivex.io/documentation/observable.html)。

当你第一次订阅时,它使用 refCount() 运算符并订阅它的源 Observable,即 Observable.from()。这一切都是同步发生的,所以它向订阅者发出它的所有值,然后发出完成,这使得 refCount() 从源取消订阅,因为没有其他观察者。

然后你订阅了第二个观察者,这一切又发生了。

如果您想获得预期的结果,您可以只使用 publish() 将源转换为 Connectable observable 并手动调用 connect()

const source$ = Rx.Observable.from([1,2,3,4])
  .filter(x => x % 2)
  .map(x => x * x)
  .publish();

source$.subscribe(x => console.log(`Stream 1 ${x}`));
source$.subscribe(x => console.log(`Stream 2 ${x}`));

source$.connect();

观看现场演示:https://jsbin.com/waraqi/2/edit?js,console