RxJS 5 Observable 源能否被链下的另一个停止?

Can an RxJS 5 Observable source be stopped by another down the chain?

RxJS 5 Angular 2 RC4 应用 Typescript 1.9:

我在一条链中有两个 observable。我想,如果第二个条件满足,第一个立即完成。我的努力似乎不必要地复杂。在下面的示例中,我尝试在发出 3 个值后停止第一个 observable:

source = Observable.interval(1000)
        .do(()=>this.print("*******EMITTING from Source*******"))
        .switchMap(count => {
            if(count<3){ //just pass along the value
                return Observable.create(observer=>{
                    observer.next(count);observer.complete()
                })                
            }
            else{ //abort by issuing a non-productive observable
                return Observable.create(observer=>
                    observer.complete()
                )
            }
        })
this.source.subscribe(count=>this.print('Ouput is '+count);

这是输出:

*******EMITTING from Source*******
Output is 0
*******EMITTING from Source*******
Output is 1
*******EMITTING from Source*******
Output is 2
*******EMITTING from Source*******
*******EMITTING from Source*******
*******EMITTING from Source*******

所以,在功能上我得到了我想要的结果,因为更广泛的脚本在三个输出后停止接收通知。但是,我确信有更好的方法。我的问题是:

  1. 上游 Observable 继续永远发射。我该如何阻止它?
  2. 我正在为每次发射创建一个新的可观察链。我不应该只传递前 3 个值,但在第 4 个值时中止或完成链吗?

您可以使用 take operator 来完成。take 获取前 N 个事件并完成流。

this.source = Observable.interval(1000)
  .do(()=>this.print("*******EMITTING from Source*******"))
  .take(3);
this.source.subscribe(count=>this.print('Ouput is '+count);

您的示例流未完成,因为 switchMap 的外部流在内部流完成时未完成。 switchMap() 等于 map().switch()。在您的示例中,map 部分发出 observables,例如:

  1. 下一个(0),完成()
  2. 下一个(1),完成()
  3. 下一个(2),完成()
  4. 完成()
  5. 完成()
  6. 完成()
  7. 完成()
  8. ...(无限继续)...

switch 部分切换这些可观察值并继续等待即将到来的可观察值。

编辑

你的例子也可以写成:

source = Observable.interval(1000)
        .do(()=>this.print("*******EMITTING from Source*******"))
        .takeWhile(count => count < 3);

编辑 2

关于你的评论,如果你想在内部流发出时终止流 true:

source = Observable.interval(1000)
        .do(()=>this.print("*******EMITTING from Source*******"))
        .switchMap(count => createSomeObservable(count))
        .takeWhile(x => x !== true);