take() 和 takeUntil() RxJS 会导致内存泄漏吗?

Do the take() and takeUntil() RxJS cause a memory leak?

从 RxJS documentation 我看到以下示例:

const source = interval(1000);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe(x => console.log(x));

这接近于我的应用程序所需的代码模式,但我发现了一个问题。 takeUntil 运算符订阅,但据我了解,Observer 无法从源 Observable 取消订阅。它无法访问可以调用 unsubscribe().

Subscription 对象

因此,如果我理解正确,那么一旦用户单击 source observable 将继续向 takeUntil 发出滴答声,takeUntil 将消耗它们而不对它们进行任何操作。

我没看错吗?如果是这样,是否有一种普遍接受的方法来杀死 Observer 管道内可观察到的 source

takeUntil 的情况如下。

当Observable作为参数传给takeUntil时,Observable的subscribertakeUntil[=17]返回 =]s,因此,在 pipe 链中创建的所有订阅都以相反的顺序一个接一个地取消订阅。

简而言之,取消订阅是由 RxJs 内部机制在幕后执行的。

要证明此行为,您可以尝试此代码

const source = interval(1000).pipe(
  tap({ next: (val) => console.log('source value', val) })
);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe((x) => console.log(x));

如果你 运行 它,你会看到消息 'source value',val 被打印直到 click 出现。此后,控制台不再打印任何消息,这意味着上游的 Observable,即 interval 函数创建的 Observable 不再通知。

您可以在this stackblitz中尝试上面的代码。

一些内部细节

我们可以看看 RxJs 实现的内部结构,看看这个取消订阅在幕后是如何工作的。

让我们从takeUntil开始。在它的实现中我们看到这样一行

innerFrom(notifier).subscribe(new OperatorSubscriber(subscriber, () => subscriber.complete(), noop));

本质上说,一旦 notifier(即作为参数传递给 takeUntil 的 Observable)发出通知,complete 方法就会在 subscriber.

complete方法的调用触发了很多事情,但最终它最终调用了Subscription的方法execTeardown,最终调用了unsubscribe的方法30=] 本身调用 unsubscribe of Subscription.

正如我们所见,这个链条很长而且很难遵循,但核心信息是 tearDown 逻辑(即当 Observable 完成时调用的逻辑,错误或取消订阅)调用 取消订阅 逻辑。

也许再看一件事很有用,直接实现自定义运算符 from the RxJs documentation

在这种情况下,在运算符定义的最后,我们找到这段代码

  // Return the teardown logic. This will be invoked when
  // the result errors, completes, or is unsubscribed.
  return () => {
    subscription.unsubscribe();
    // Clean up our timers.
    for (const timerID of allTimerIDs) {
      clearTimeout(timerID);
    }
  };

这是此自定义运算符的拆卸逻辑,此类逻辑会调用 unsubscribe 以及任何其他清理 activity。