RxJS - .subscribe() 与 .publish().connect()

RxJS - .subscribe() vs .publish().connect()

这主要是一个 RxJs 最佳 practice/approach 问题,因为我的 POC 代码有效,但我是 RxJs 的新手。

问题归结为 .subscribe().publish().connect(),因为他们似乎都在做同样的事情。

在我的 angular2 应用程序中,我有一个按钮调用一个函数来注销用户,它在我的服务中调用一个函数来执行一些服务器端操作,returns 我 URL将用户重定向到。为了发起请求,我调用 .subscribe() 使可观察对象开始产生值。我正在阅读一篇关于 "Cold vs Hot Observables" 的文章,另一种方法是调用 .publish().connect() 而不是 .subscribe()。这两种方法都有什么好处吗?

<a (click)="logout()">Logout</a>

注销函数如下所示:

logout.component.ts

logout() { this.authService.logout(); }

服务(实际注销)如下所示:

auth.service.ts

logout() : Observable<boolean>  {
        this.http.get(this.location.prepareExternalUrl('api/v1/authentication/logout'))
            .map(this.extractData)
            .catch(this.handleError)
            .do((x: string) => { window.location.href = x; })
            .subscribe();    // Option A - 

        return Observable.of(true);

    }

auth.service.alternative.ts

logout() : Observable<boolean>  {
        this.http.get(this.location.prepareExternalUrl('api/v1/authentication/logout'))
            .map(this.extractData)
            .catch(this.handleError)
            .do((x: string) => { window.location.href = x; })
            .publish()  // Option B - Make connectable observable
            .connect(); // Option B - Cause the connectable observable to subscribe and produce my value       

        return Observable.of(true);
    }

这稍微回避了您的问题,但您可能会发现它有帮助:

我不会 return 一个与调用 http 服务的不同的可观察流,因为这样做会使调用函数无法:

  • 取消直播
  • 修改流
  • 判断操作是否成功

相反,我会这样做:

auth.servive.ts

logout() : Observable<string>  {
       return this.http.get(...).map(this.extractData)          
            .catch(this.handleError);
}

现在调用代码可以对结果做任何它想做的事情 url

logout.component.ts

logout(){
    this.authService.logout().subscribe(
        url => window.location.href = url,
        err => {
            /*todo: handle if error was thrown by authService.handleError*/
        }
    );
}

subscribe().publish().connect() 之间的区别在于它们订阅其源 Observable 的时间。考虑以下 Observable:

let source = Observable.from([1, 2, 3])

这个 Observable 会在订阅时立即将所有值发送给 Observer。因此,如果我有两个观察者,那么他们将按顺序接收所有值:

source.subscribe(val => console.log('obs1', val));
source.subscribe(val => console.log('obs2', val));

这将打印到控制台:

obs1 1
obs1 2
obs1 3
obs2 1
obs2 2
obs2 3

另一方面调用 .publish() returns ConnectableObservable。这个 Observable 没有在它的构造函数中订阅它的源(source 在我们的例子中)并且只保留它的引用。然后你可以为它订阅多个观察者,什么也不会发生。最后,您调用 connect() 并且 ConnectableObservable 订阅开始发射值的 source。这次已经有两个 Observers 订阅了,所以它会一个一个地向它们发送值:

let connectable = source.publish();
connectable.subscribe(val => console.log('obs1', val));
connectable.subscribe(val => console.log('obs2', val));
connectable.connect();

打印到控制台:

obs1 1
obs2 1
obs1 2
obs2 2
obs1 3
obs2 3

观看现场演示:http://plnkr.co/edit/ySWocRr99m1WXwsOGfjS?p=preview