在 RxJs 中跟踪异步操作

Tracking asynchronous operation in RxJs

我有一个具有异步步骤的订阅 - api 调用。当 api 通话正在进行时,我想显示加载动画。它可以很容易地 done 当流创建访问 UI:

Rx.Observable
    .combineLatest(page$, sorting$)
    .switchMap(function(args) {
        var page = args[0];
        var sorting = args[1];

        $('.overlay').show();

        return loadData(page, sorting);
    })
    .subscribe(function(data) {
        renderData(data);
        $('.overlay').hide();
    });

但是当我将流管理代码移动到 service 时(例如为了代码重用)跟踪异步操作的能力丢失并且我无法显示加载动画。

有什么办法可以做到吗? Return 来自服务的两个流?

提前致谢。

您可以将额外的通知对象传递给 setupLoading:

setupLoading: function(page$, sorting$, notify$) {
  return Rx.Observable
    .combineLatest(page$, sorting$)
    .switchMap(function(args) {
      notify$.next(true); // here we go...
      var page = args[0];
      var sorting = args[1];

      return loadData(page, sorting);
    });
}

然后创建,除了page$sorting$loadStarted$ observable。

loadStarted$.subscribe(() => $('.overlay').hide();

并且您的加载程序创建将是:

service.setupLoading(page$, sorting$, loadStarted$)
  .subscribe(function(data) {
     renderData(data);
  });
});

顺便说一句,既然你很好地使用了 rxjs,为什么不将它用于 show/hide - angular2 方式 - 并放弃 jquery:

<div class="overlay" [style.display]="(showOverlay ? 'block' : 'none')>your overlay content here</div>

为什么不创建一个包含多个 'events' - startedfinished 的流,如下所示:

function loadDataWithStartEvent($page, sorting$) {
  return Rx.Observable
    .combineLatest(page$, sorting$)
    .flatMap(args => {
      var page = args[0];
      var sorting = args[1];

      return Rx.Observable.just({ event : 'started' })
        .concat(
          // given that loadData returns an Observable<data> which only emits data one time
          loadData(pageSortingTuple.page, pageSortingTuple.sorting))
            .map(data => ({ event: 'finished', data: data}))
        );
    });    
}

现在您订阅如下:

loadDataWithStartEvent(page$, sorting$)
  .doOnNext(evt => {
    if(evt.type == 'started') {
      $('overlay').show();
    } else {
      $('overlay').hide();
    }
  })
  .filter(evt => evt.type === 'finished')
  .map(evt => evt.data);
  .subscribe(data => renderData(data));

最终我使用了自定义运算符:

class RequestState {
  loading: boolean;
}


Observable.prototype.captureState = function captureState<T>(this: Observable<T>, state: RequestState): Observable<T> {
  state.loading = true;

  let onLoadingEnd = () => {
    state.loading = false;
  };

  obs = obs.do(onLoadingEnd, onLoadingEnd, onLoadingEnd);

  return obs;
};


class Component {
  requestState = new RequestState();

  ngOnInit() {
    this.api.endpoint(query)
      .captureState(this.requestState)
      .subscribe(...);
  }
}