Angular 2 的 RxJS 5:按先前结果确定的时间表重播订阅

RxJS 5 with Angular 2: replay subscription on schedule determined by previous result

在我的 Angular 2 typescript 2 应用程序中,我向服务器查询需要定期更新的值。更新之间的延迟是可变的(服务器发送过期日期和值)。

我在编写 Observable 流时遇到问题,该流将在当前值到期后自动重播(启动对服务器的新调用)。到目前为止,我所拥有的根本无法扩展:

price = 5; //initial value is known
expires = ...;//initial expiration is known

getData(){
    // server returns {expires:number, price:number}
    this.http.get('...').map(res => res.json())
}

Observable.timer(expires-Date.now()) // when initial price expires
    .switchMap(()=>this.getData()) // fetch new price and expiration
    .subscribe( data =>
        {
            this.price = data.price;
            Observable.timer(data.expires-Date.now()) //redo when price expires
                .switchMap(()=>getData())
                .subscribe(...) //callback hell (endless inner blocks)
        }
    );

必须有更好的方法来安排跟进电话

必要时首先调用 doObservableStuff

getData(){
    // server returns {expires:number, price:number}
    this.http.get('...').map(res => res.json())
    .subscribe( data =>
        {
            this.price = data.price;
            doObservableStuff(data.expires-Date.now())
        });
}

doObservableStuff(time){
    Observable.timer(time)
    .switchMap(() => this.getData())
}

我的方法是使用 switchMap()timer() 来延迟可观察链的启动,并且 repeat() 在之前的价格到期后继续寻找新数据:

price = 0;    //initial price
_exp  = 0; //initial delay before fetching data

/** Observable will emit after the expiration delay */
wait() {return Observable.timer(this._exp*1000)}

stream$ = Observable.of(null)
    .switchMap(()=>this.wait()) // wait for the expiration delay                
    .switchMap(()=>this.getServerData()) // get fresh data
    .do(e=>{this._exp = e.expires}) //update expiration             
    .repeat() // repeat until the calling code unsubscribes

当我订阅时,第一个价格被立即获取,并且序列无限重复,每个周期延迟 expires 秒。我可以在价格到达时更新模型:

ngOnInit(){
    this.stream$.subscribe( e=>this.price = e.price);
}

Live demo