Angular 2 的 RxJS 5:按先前结果确定的时间表重播订阅
RxJS 5 with Angular 2: replay subscription on schedule determined by previous result
在我的 Angular 2
typescript 2
应用程序中,我向服务器查询需要定期更新的值。更新之间的延迟是可变的(服务器发送过期日期和值)。
我在编写 Observable 流时遇到问题,该流将在当前值到期后自动重播(启动对服务器的新调用)。到目前为止,我所拥有的根本无法扩展:
price = 5; //initial value is known
expires = ...;//initial expiration is known
getData(){
// server returns {expires:number, price:number}
this.http.get('...').map(res => res.json())
}
Observable.timer(expires-Date.now()) // when initial price expires
.switchMap(()=>this.getData()) // fetch new price and expiration
.subscribe( data =>
{
this.price = data.price;
Observable.timer(data.expires-Date.now()) //redo when price expires
.switchMap(()=>getData())
.subscribe(...) //callback hell (endless inner blocks)
}
);
必须有更好的方法来安排跟进电话
必要时首先调用 doObservableStuff
getData(){
// server returns {expires:number, price:number}
this.http.get('...').map(res => res.json())
.subscribe( data =>
{
this.price = data.price;
doObservableStuff(data.expires-Date.now())
});
}
doObservableStuff(time){
Observable.timer(time)
.switchMap(() => this.getData())
}
我的方法是使用 switchMap()
和 timer()
来延迟可观察链的启动,并且 repeat()
在之前的价格到期后继续寻找新数据:
price = 0; //initial price
_exp = 0; //initial delay before fetching data
/** Observable will emit after the expiration delay */
wait() {return Observable.timer(this._exp*1000)}
stream$ = Observable.of(null)
.switchMap(()=>this.wait()) // wait for the expiration delay
.switchMap(()=>this.getServerData()) // get fresh data
.do(e=>{this._exp = e.expires}) //update expiration
.repeat() // repeat until the calling code unsubscribes
当我订阅时,第一个价格被立即获取,并且序列无限重复,每个周期延迟 expires
秒。我可以在价格到达时更新模型:
ngOnInit(){
this.stream$.subscribe( e=>this.price = e.price);
}
在我的 Angular 2
typescript 2
应用程序中,我向服务器查询需要定期更新的值。更新之间的延迟是可变的(服务器发送过期日期和值)。
我在编写 Observable 流时遇到问题,该流将在当前值到期后自动重播(启动对服务器的新调用)。到目前为止,我所拥有的根本无法扩展:
price = 5; //initial value is known
expires = ...;//initial expiration is known
getData(){
// server returns {expires:number, price:number}
this.http.get('...').map(res => res.json())
}
Observable.timer(expires-Date.now()) // when initial price expires
.switchMap(()=>this.getData()) // fetch new price and expiration
.subscribe( data =>
{
this.price = data.price;
Observable.timer(data.expires-Date.now()) //redo when price expires
.switchMap(()=>getData())
.subscribe(...) //callback hell (endless inner blocks)
}
);
必须有更好的方法来安排跟进电话
必要时首先调用 doObservableStuff
getData(){
// server returns {expires:number, price:number}
this.http.get('...').map(res => res.json())
.subscribe( data =>
{
this.price = data.price;
doObservableStuff(data.expires-Date.now())
});
}
doObservableStuff(time){
Observable.timer(time)
.switchMap(() => this.getData())
}
我的方法是使用 switchMap()
和 timer()
来延迟可观察链的启动,并且 repeat()
在之前的价格到期后继续寻找新数据:
price = 0; //initial price
_exp = 0; //initial delay before fetching data
/** Observable will emit after the expiration delay */
wait() {return Observable.timer(this._exp*1000)}
stream$ = Observable.of(null)
.switchMap(()=>this.wait()) // wait for the expiration delay
.switchMap(()=>this.getServerData()) // get fresh data
.do(e=>{this._exp = e.expires}) //update expiration
.repeat() // repeat until the calling code unsubscribes
当我订阅时,第一个价格被立即获取,并且序列无限重复,每个周期延迟 expires
秒。我可以在价格到达时更新模型:
ngOnInit(){
this.stream$.subscribe( e=>this.price = e.price);
}