为什么结果不共享?
Why result does not get shared?
我有以下代码片段:
const source$ = Rx.Observable.from([1,2,3,4])
.filter(x => x % 2)
.map(x => x * x)
.share();
source$.subscribe(x => console.log(`Stream 1 ${x}`));
source$.subscribe(x => console.log(`Stream 2 ${x}`));
作为我得到的结果
但我排除了以下共享结果:
"Stream 1 1"
"Stream 2 1"
"Stream 1 9"
"Stream 2 9"
为什么结果没有被分享?
这是因为您使用的是冷 Observable (http://reactivex.io/documentation/observable.html)。
当你第一次订阅时,它使用 refCount()
运算符并订阅它的源 Observable,即 Observable.from()
。这一切都是同步发生的,所以它向订阅者发出它的所有值,然后发出完成,这使得 refCount()
从源取消订阅,因为没有其他观察者。
然后你订阅了第二个观察者,这一切又发生了。
如果您想获得预期的结果,您可以只使用 publish()
将源转换为 Connectable observable 并手动调用 connect()
。
const source$ = Rx.Observable.from([1,2,3,4])
.filter(x => x % 2)
.map(x => x * x)
.publish();
source$.subscribe(x => console.log(`Stream 1 ${x}`));
source$.subscribe(x => console.log(`Stream 2 ${x}`));
source$.connect();
我有以下代码片段:
const source$ = Rx.Observable.from([1,2,3,4])
.filter(x => x % 2)
.map(x => x * x)
.share();
source$.subscribe(x => console.log(`Stream 1 ${x}`));
source$.subscribe(x => console.log(`Stream 2 ${x}`));
作为我得到的结果
但我排除了以下共享结果:
"Stream 1 1"
"Stream 2 1"
"Stream 1 9"
"Stream 2 9"
为什么结果没有被分享?
这是因为您使用的是冷 Observable (http://reactivex.io/documentation/observable.html)。
当你第一次订阅时,它使用 refCount()
运算符并订阅它的源 Observable,即 Observable.from()
。这一切都是同步发生的,所以它向订阅者发出它的所有值,然后发出完成,这使得 refCount()
从源取消订阅,因为没有其他观察者。
然后你订阅了第二个观察者,这一切又发生了。
如果您想获得预期的结果,您可以只使用 publish()
将源转换为 Connectable observable 并手动调用 connect()
。
const source$ = Rx.Observable.from([1,2,3,4])
.filter(x => x % 2)
.map(x => x * x)
.publish();
source$.subscribe(x => console.log(`Stream 1 ${x}`));
source$.subscribe(x => console.log(`Stream 2 ${x}`));
source$.connect();