angular2 rxjs 共享订阅不触发

angular2 rxjs shared subscribe not firing

Angular2 2.0.0-beta.15,Rxjs 5.0.0-beta.2

我有两个简单的 rxjs observables,一个共享,一个不共享(.share() 调用它)

我有一个 angular2 组件,它订阅了两个 observables 并在其模板中显示了值,它还订阅了带有异步管道的 observables。

我点击一个按钮在两个可观察对象上设置 .next() 并带有一些垃圾。

模板更新并显示每个的最新值。 组件中的订阅函数不会为非共享可观察对象触发。为什么?

笨蛋: https://plnkr.co/edit/IIWvnTT1zLit1eUNknkk?p=preview

@Component({
    selector: 'my-app',
    template: `
        <div>
            <div>obs1 value: {{ observable1 | async }}</div>
            <div>obs2 value: {{ observable2 | async }}</div>
            <button (click)="randomizeIt()">randomize it</button>

            <br>
            <h3>Question 1</h3>
            Why is obs1's initial value (from startWith) not displayed?

            <h3>Question 2</h3>
            Why does our subscription to obs2 never fire on new values?<br>
            check console.log to see what i mean.
        </div>`,
})
export class App {

    public observable1:Observable<string>;
    public observer1:any;
    public observable2:Observable<string>;
    public observer2:any;

    constructor() {
        this.observable1 = Observable.create(observer => {
            this.observer1 = observer;
        }).startWith("initial obs 1 value").share();

        this.observable1.subscribe((val) => {
            console.log('YOU WILL SEE THIS:', val);
        })

        this.observable2 = Observable.create(observer => {
            this.observer2 = observer;
        }).startWith("initial obs 2 value"); // no share!

        this.observable2.subscribe((val) => {
            console.log('WHY DOES THIS NEVER FIRE ON NEW VALUES?:', val);
        })

    }

    public randomizeIt() {
        let r = Math.random();
        console.log('set both obs to: ', r);
        this.observer1.next(r);
        this.observer2.next(r);
    }   

}

提前致谢!

Q1。 为什么obs1的初始值(从startWith开始)不显示?

来自 share() 文档:

Returns an observable sequence that shares a single subscription to the underlying sequence.

这意味着,在您的情况下,{{ observable1 | async }}this.observable1.subscribe(....) 共享相同的订阅,并且当您在构造函数中订阅 observable1 时,您已经启动了序列。当视图初始化时,对 observable1 的订阅已经开始并且发出了第一个值。所以 async 将调用 subscribe(),但不会获得序列的第一个发射。

因此,如果您将订阅移动到 ngAfterViewInit() 内部,初始值将转到 async 订阅。

ngAfterViewInit(){
  this.observable1.subscribe((val) => {
        console.log('YOU WILL SEE THIS CHANGE:', val);
    })
}

Q2。 为什么我们对 obs2 的订阅永远不会触发新值?

每次你订阅一个cold Observable,你实际上创建了一个新的Observable实例。当您创建一个新实例时,您也会创建一个新的观察者实例。因此,您对构造函数的订阅是可观察的 instance1 和观察者 instance1,而 async 的订阅是可观察的 instance2 和观察者 instance2。因此,当您调用 randomizeIt() 时,您调用 observer2 instance2 绑定到 async.

订阅的可观察 instance2

让您的可观察对象“热”:

let obs = Observable.create(observer => {
    this.observer2 = observer;
}).startWith("initial obs 2 value");
this.observable2 = obs.publish();
    
this.observable2.subscribe((val) => {
    console.log('WHY DOES THIS NEVER FIRE?:', val);
})
this.observable2.connect();

更新
假设您有一个发出随机值的 Observable:

this.observable3 = Observable.create(observer => {
    observer.next(Math.random());
});

现在,您每次订阅 this.observable3 都会获得一个新的随机值。每个订户的价值都不相同。因为每个订阅者都有一个新实例。

来自文档:

It helps to think of cold and hot Observables as movies or performances that one can watch ("subscribe").

Cold Observables: movies.
Hot Observables: live performances.
Hot Observables replayed: live performances recorded on video.

无论何时观看电影,您对电影的 运行 独立于其他人的 运行,即使所有电影观众看到的效果相同。另一方面,现场表演被分享给多个观众。如果您迟到了现场表演,您将错过一些表演。然而,如果它被录制在视频中(在 RxJS 中,这将发生在 BehaviorSubject 或 ReplaySubject 中),您可以观看现场表演的“电影”。 .publish().refCount() 现场表演是艺术家在没有人观看时停止演奏,并在观众中至少有一个人时重新开始演奏的表演。