RXJS:从分页界面创建可观察流的惯用方法

RXJS : Idiomatic way to create an observable stream from a paged interface

我有分页界面。给定起点,请求将生成结果列表和持续指示器。

我创建了一个 observable,它是通过构造和平面映射一个读取页面的 observable 来构建的。这个 observable 的结果包含页面的数据和要继续的值。我提取数据并将其平面映射到订阅者。产生价值流。

为了处理分页,我为下一页值创建了一个主题。它以初始值播种,然后每次我收到包含有效下一页的响应时,我都会推送到页面主题并触发另一次读取,直到没有更多内容可读为止。

有没有更惯用的方法来做到这一点?

    function records(start = 'LATEST', limit = 1000) {
      let pages = new rx.Subject();

      this.connect(start)
        .subscribe(page => pages.onNext(page));

      let records = pages
        .flatMap(page => {
          return this.read(page, limit)
            .doOnNext(result => {
              let next = result.next;
              if (next === undefined) {
                pages.onCompleted();
              } else {
                pages.onNext(next);
              }
            });
        })
        .pluck('data')
        .flatMap(data => data);

      return records;
    }

这是一种合理的方法。它有几个潜在的缺陷(可能会或可能不会影响您,具体取决于您的用例):

  1. 您没有提供任何方法来观察 this.connect(start)
  2. 中发生的任何错误
  3. 您的观察结果实际上是 hot。如果调用者没有立即订阅可观察对象(也许他们存储它并稍后订阅),那么他们将错过 this.connect(start) 的完成并且可观察对象似乎永远不会生产任何东西。
  4. 如果来电者改变主意并提前取消订阅,您无法提供从初始 connect 电话中取消订阅的方法。没什么大不了的,但通常当一个人构造一个可观察的对象时,应该尝试将一次性对象链接在一起,以便在调用者取消订阅时正确调用清理。

这是修改后的版本:

  1. 它将错误从 this.connect 传递给观察者。
  2. 它使用 Observable.create 创建一个 cold observable,它只有在调用者实际订阅时才开始运行,因此不会丢失初始页面值并停止运行流。
  3. 它将this.connect订阅一次性与整体订阅一次性结合

代码:

    function records(start = 'LATEST', limit = 1000) {
        return Rx.Observable.create(observer => {
            let pages = new Rx.Subject();
            let connectSub = new Rx.SingleAssignmentDisposable();
            let resultsSub = new Rx.SingleAssignmentDisposable();
            let sub = new Rx.CompositeDisposable(connectSub, resultsSub);

            // Make sure we subscribe to pages before we issue this.connect()
            // just in case this.connect() finishes synchronously (possible if it caches values or something?)
            let results = pages
                .flatMap(page => this.read(page, limit))
                .doOnNext(r => this.next !== undefined ? pages.onNext(this.next) : pages.onCompleted())
                .flatMap(r => r.data);
            resultsSub.setDisposable(results.subscribe(observer));

            // now query the first page
            connectSub.setDisposable(this.connect(start)
                .subscribe(p => pages.onNext(p), e => observer.onError(e)));

            return sub;
        });
    }

注意:我以前没有使用过 ES6 语法,所以希望我没有弄乱这里的任何内容。