RxJs:在所有取消订阅时中止延迟和共享的可观察对象

RxJs: Abort a deferred and shared observable when all unsubscribe

我想创建一个运行长轮询操作的 rxjs Observable。 每次迭代都会发出中间结果。 当 isComplete returns 为真时,Observable 完成。

此函数应按如下方式运行

  1. 它应该只有在 至少一个订阅者
  2. 时才会开始
  3. 它应该允许多个订阅者共享结果
  4. 如果没有订阅者,它应该中止轮询并取消呼叫

以下代码可以正常运行并满足条件(1)和(2):

function longPollingAction(fetch: () => Promise<Response>, cancel: () => {}): Observable<Response> {
   return defer(() => { // defer to start running when there's a single subscriber
     return from(fetch()).pipe(
         expand(() => timer(1000).pipe(switchMap(fetch))),
         takeWhile<Response>(isComplete, false),
    );
   }).pipe(share()); // share to allow multiple subscribers
}

function isComplete(r: Response): boolean {
   // returns true if r is complete. 
}

如何修改此代码以满足 (3) 的要求?在当前的实现中,轮询停止了,但是我该如何调用 cancel?

使用定稿

您可以使用 finalize 调用取消。这可能是这样的:

function longPollingAction(
  fetch: () => Promise<Response>,
  cancel: () => void
): Observable<Response> {
  // defer to turn eager promise into lazy observable
  return defer(fetch).pipe( 
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    finalize(cancel),
    share() // share to allow multiple subscribers
  );
}

function isComplete(r: Response): boolean {
   // returns true if r is complete. 
}

回调 complete

tap 操作员可以访问 nexterrorcomplete 发射。对于 callback: () => void,这已经足够了。

function longPollingAction(
  fetch: () => Promise<Response>,
  cancel: () => void
): Observable<Response> {
  // defer to turn eager promise into lazy observable
  return defer(fetch).pipe( 
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    tap({
      complete: cancel
    }),
    share() // share to allow multiple subscribers
  );
}

function isComplete(r: Response): boolean {
   // returns true if r is complete. 
}

回调 unsubscribe

认为不存在这样的运算符,但我们可以很容易地创建一个。此运算符只会在取消订阅时触发回调。它会忽略 errorcomplete.

function onUnsubscribe<T>(
  fn: () => void
): MonoTypeOperatorFunction<T> {
  return s => new Observable(observer => {
    const bindOn = name => observer[name].bind(observer);
    const sub = s.subscribe({
      next: bindOn("next"),
      error: bindOn("error"),
      complete: bindOn("complete")
    });
   
    return {
      unsubscribe: () => {
        fn();
        sub.unsubscribe()
      }
    };
  });
}

那么你可以这样使用它:

function longPollingAction(
  fetch: () => Promise<Response>,
  cancel: () => void
): Observable<Response> {
  // defer to turn eager promise into lazy observable
  return defer(fetch).pipe( 
    expand(() => timer(1000).pipe(switchMap(fetch))),
    takeWhile<Response>(isComplete, false),
    onUnsubscribe(cancel),
    share() // share to allow multiple subscribers
  );
}

function isComplete(r: Response): boolean {
   // returns true if r is complete. 
}

由于 share 正在管理您的订阅并且共享只会退订一次 refCount < 1,因此在这种情况下调用取消的唯一方法是没有订阅者。

给猫剥皮的方法不止一种,但我会这样做:

const onUnsubscribe = (callback: () => void) => <T>(source$: Observable<T>) =>
  new Observable<T>(observer => {
    let isSourceDone = false;

    const subscription = source$.subscribe(
      val => {
        observer.next(val);
      },
      e => {
        isSourceDone = true;
        observer.error(e);
      },
      () => {
        isSourceDone = true;
        observer.complete();
      }
    );

    return () => {
      if (isSourceDone) return;
      callback();
      subscription.unsubscribe();
    };
  });

function longPollingAction(
  fetch: () => Promise<Response>,
  cancel: () => {}
): Observable<Response> {
  const lazyFetch$ = defer(() => fetch());
  return lazyFetch$.pipe(
    expand(() => timer(1000).pipe(mergeMapTo(lazyFetch$))),
    takeWhile<Response>(isComplete, false),
    onUnsubscribe(cancel),
    share()
  );
}

function isComplete(r: Response): boolean {
  // returns true if r is complete.
}