RX 合并多个 observable

RX merge multiple observables

在我的代码片段中,我发出了一个 api 请求,一切正常。我现在正在查看响应以查看服务器上可用的项目总数是否大于页面大小定义的 returned 结果集。如果是,我想进行额外的 api 调用,直到检索到所有结果并将 return 作为一个响应发送给订阅者。我需要使用哪个 RX 运算符来完成此操作,以及如何暂停下面响应的 return 直到后续 api 调用完成?

    getAction<T>(path: string, params?: {}): Observable<T> {
        return this._http.get("url")
            .map(res => {
                let response = res.json();
                // If more pages available make additional api calls & return as single result
                return response;
            });
    }

您应该使用 switchmap 从另一个 observable 获取响应,直到您拥有所有数据。只是连接所有的响应和 return 它作为最后一个可观察的响应。类似于:

//emit immediately, then every 5s
const source = Rx.Observable.timer(0, 5000);

//switch to new inner observable when source emits, emit items that are emitted
const example = source.switchMap(() => Rx.Observable.interval(500));

//output: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8
const subscribe = example.subscribe(val => console.log(val));

看看expand

要递归地获取多页数据,您可以这样做:

class MyExample {
  search(offset) {
    return this.http.get(`/search?offset=${offset}`);
  }

  searchAll() {
    return this.search(0)
               .expand(results => {
                 if (loadNextPage(results)) {
                   return this.search(results.nextPageOffset);
                 } else {
                   return Observable.empty();
                 }
               });
  }
}

expand 允许您根据以前的结果做一些处理(比如检查是否有更多页面),并指定一个 Observable 有更多的结果。所有这些调用的结果将被串联起来,无需担心自己携带它们。