如何在订阅方法中取消订阅 RXJS 订阅?

how to unsubscribe a RXJS subscription inside the subscribe method?

我有一些 javascript:

this.mySubscription = someObservable.subscribe((obs: any) => {
   this.mySubscription.unsubscribe();
   this.mySubscription = undefined;
}

执行时,控制台记录错误 ERROR TypeError: Cannot read property 'unsubscribe' of undefined。 我想知道为什么我不能在订阅 lambda 函数中取消订阅。有正确的方法吗?我已经阅读了一些关于使用虚拟主题并完成它们或使用 takeUntil/takeWhile 和其他管道操作员变通方法的内容。

在订阅的订阅功能中取消订阅的正确way/workaround是什么?

我目前正在使用这样的虚拟订阅:

mySubscription: BehaviorSubject<any> = new BehaviorSubject<any>(undefined);


// when I do the subscription:
dummySubscription: BehaviorSubject<any> = new BehaviourSubject<any>(this.mySubscription.getValue());
this.mySubscription = someObservable.subscribe((obs: any) => {
    // any work...
    dummySubscription.next(obs);
    dummySubscription.complete();
    dummySubscription = undefined;
}, error => {
    dummySubscription.error(error);
});

dummySubscription.subscribe((obs: any) => {
    // here the actual work to do when mySubscription  emits a value, before it should have been unsubscribed upon
}, err => {
    // if errors need be
});

您不应该尝试在 subscribe 功能中退订。
您可以使用 taketakeWhiletakeUntil.

等运算符取消订阅

someObservable 发出 n 次后使用 take(n) 取消订阅。

someObservable.pipe(
  take(1)
).subscribe(value => console.log(value));

takeWhile

当发出的值不符合条件时,使用 takeWhile 取消订阅。

someObservable.pipe(
  takeWhile(value => valueIsSave(value))
).subscribe(value => console.log(value));

valueIsSave(value): boolean {
  // return true if the subscription should continue
  // return false if you want to unsubscribe on that value
}

takeUntil

当 Observable obs$ 发出时使用 takeUntil(obs$) 取消订阅。

const terminate = new Subject();

someObservable.pipe(
  takeUntil(terminate)
).subscribe(value => console.log(value));

unsub() { 
  terminate.next() // trigger unsubscribe
}

如果您使您的流异步,您正在做的事情应该会起作用。例如,这将不起作用:

const sub = from([1,2,3,4,5,6,7,8,9,10]).subscribe(val => {
  console.log(val);
  if(val > 5) sub.unsubscribe();
});

但这会起作用:

const sub2 = from([1,2,3,4,5,6,7,8,9,10]).pipe(
  delay(0)
).subscribe(val => {
  console.log(val);
  if(val > 5) sub2.unsubscribe();
});

因为 JS 事件循环是相当可预测的(代码块总是 运行 完成),如果你的流的任何部分是异步的,那么你可以确定你的订阅将在你的 lambda 之前定义调用回调。

你应该这样做吗?

可能不会。如果您的代码依赖于 language/compiler/interpreter/etc 的内部(否则是隐藏的)阴谋,那么您已经创建了难以维护的脆弱代码 and/or 代码。下一个查看我的代码的开发人员会对为什么会有 delay(0) 感到困惑 - 看起来它不应该做任何事情。

请注意,在 subscribe() 中,您的 lambda 可以访问其闭包以及当前流变量。 takeWhile() 运算符可以访问相同的闭包和相同的流变量。

from([1,2,3,4,5,6,7,8,9,10]).pipe(
  takeWhile(val => {
    // add custom logic
    return val <= 5;
  })
).subscribe(val => {
  console.log(val);
});

takeWhile() 可以做到 sub = subscribe(... sub.unsubscibe() ... ) 的任何事情,并且具有不需要您管理订阅对象并且更容易 read/maintain.

的额外好处

受此处另一个答案的启发,尤其是本文 https://medium.com/@benlesh/rxjs-dont-unsubscribe-6753ed4fda87,我想建议 takeUntil() 使用以下示例:

...
    let stop$: Subject<any> = new Subject<any>(); // This is the one which will stop the observable ( unsubscribe a like mechanism )

    obs$
      .pipe(
        takeUntil(stop$)
      )
      .subscribe(res => {
        if ( res.something === true ) {
          // This next to lines will cause the subscribe to stop
          stop$.next();
          stop$.complete();
        }

      });
...

我想引用上面提到的那些文章标题中的句子 RxJS:不要取消订阅 :)。