RxJS 5 Observable 源能否被链下的另一个停止?
Can an RxJS 5 Observable source be stopped by another down the chain?
RxJS 5
Angular 2 RC4
应用 Typescript 1.9
:
我在一条链中有两个 observable。我想,如果第二个条件满足,第一个立即完成。我的努力似乎不必要地复杂。在下面的示例中,我尝试在发出 3 个值后停止第一个 observable:
source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.switchMap(count => {
if(count<3){ //just pass along the value
return Observable.create(observer=>{
observer.next(count);observer.complete()
})
}
else{ //abort by issuing a non-productive observable
return Observable.create(observer=>
observer.complete()
)
}
})
this.source.subscribe(count=>this.print('Ouput is '+count);
这是输出:
*******EMITTING from Source*******
Output is 0
*******EMITTING from Source*******
Output is 1
*******EMITTING from Source*******
Output is 2
*******EMITTING from Source*******
*******EMITTING from Source*******
*******EMITTING from Source*******
所以,在功能上我得到了我想要的结果,因为更广泛的脚本在三个输出后停止接收通知。但是,我确信有更好的方法。我的问题是:
- 上游 Observable 继续永远发射。我该如何阻止它?
- 我正在为每次发射创建一个新的可观察链。我不应该只传递前 3 个值,但在第 4 个值时中止或完成链吗?
您可以使用 take
operator 来完成。take
获取前 N 个事件并完成流。
this.source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.take(3);
this.source.subscribe(count=>this.print('Ouput is '+count);
您的示例流未完成,因为 switchMap
的外部流在内部流完成时未完成。 switchMap()
等于 map().switch()
。在您的示例中,map
部分发出 observables,例如:
- 下一个(0),完成()
- 下一个(1),完成()
- 下一个(2),完成()
- 完成()
- 完成()
- 完成()
- 完成()
- ...(无限继续)...
switch
部分切换这些可观察值并继续等待即将到来的可观察值。
编辑
你的例子也可以写成:
source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.takeWhile(count => count < 3);
编辑 2
关于你的评论,如果你想在内部流发出时终止流 true
:
source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.switchMap(count => createSomeObservable(count))
.takeWhile(x => x !== true);
RxJS 5
Angular 2 RC4
应用 Typescript 1.9
:
我在一条链中有两个 observable。我想,如果第二个条件满足,第一个立即完成。我的努力似乎不必要地复杂。在下面的示例中,我尝试在发出 3 个值后停止第一个 observable:
source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.switchMap(count => {
if(count<3){ //just pass along the value
return Observable.create(observer=>{
observer.next(count);observer.complete()
})
}
else{ //abort by issuing a non-productive observable
return Observable.create(observer=>
observer.complete()
)
}
})
this.source.subscribe(count=>this.print('Ouput is '+count);
这是输出:
*******EMITTING from Source*******
Output is 0
*******EMITTING from Source*******
Output is 1
*******EMITTING from Source*******
Output is 2
*******EMITTING from Source*******
*******EMITTING from Source*******
*******EMITTING from Source*******
所以,在功能上我得到了我想要的结果,因为更广泛的脚本在三个输出后停止接收通知。但是,我确信有更好的方法。我的问题是:
- 上游 Observable 继续永远发射。我该如何阻止它?
- 我正在为每次发射创建一个新的可观察链。我不应该只传递前 3 个值,但在第 4 个值时中止或完成链吗?
您可以使用 take
operator 来完成。take
获取前 N 个事件并完成流。
this.source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.take(3);
this.source.subscribe(count=>this.print('Ouput is '+count);
您的示例流未完成,因为 switchMap
的外部流在内部流完成时未完成。 switchMap()
等于 map().switch()
。在您的示例中,map
部分发出 observables,例如:
- 下一个(0),完成()
- 下一个(1),完成()
- 下一个(2),完成()
- 完成()
- 完成()
- 完成()
- 完成()
- ...(无限继续)...
switch
部分切换这些可观察值并继续等待即将到来的可观察值。
编辑
你的例子也可以写成:
source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.takeWhile(count => count < 3);
编辑 2
关于你的评论,如果你想在内部流发出时终止流 true
:
source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.switchMap(count => createSomeObservable(count))
.takeWhile(x => x !== true);