take() 和 takeUntil() RxJS 会导致内存泄漏吗?
Do the take() and takeUntil() RxJS cause a memory leak?
从 RxJS documentation 我看到以下示例:
const source = interval(1000);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe(x => console.log(x));
这接近于我的应用程序所需的代码模式,但我发现了一个问题。 takeUntil
运算符订阅,但据我了解,Observer
无法从源 Observable
取消订阅。它无法访问可以调用 unsubscribe()
.
的 Subscription
对象
因此,如果我理解正确,那么一旦用户单击 source
observable 将继续向 takeUntil
发出滴答声,takeUntil
将消耗它们而不对它们进行任何操作。
我没看错吗?如果是这样,是否有一种普遍接受的方法来杀死 Observer
管道内可观察到的 source
?
takeUntil
的情况如下。
当Observable作为参数传给takeUntil
时,Observable的subscriber
由takeUntil
[=17]返回 =]s,因此,在 pipe
链中创建的所有订阅都以相反的顺序一个接一个地取消订阅。
简而言之,取消订阅是由 RxJs 内部机制在幕后执行的。
要证明此行为,您可以尝试此代码
const source = interval(1000).pipe(
tap({ next: (val) => console.log('source value', val) })
);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe((x) => console.log(x));
如果你 运行 它,你会看到消息 'source value',val 被打印直到 click
出现。此后,控制台不再打印任何消息,这意味着上游的 Observable,即 interval
函数创建的 Observable 不再通知。
您可以在this stackblitz中尝试上面的代码。
一些内部细节
我们可以看看 RxJs 实现的内部结构,看看这个取消订阅在幕后是如何工作的。
让我们从takeUntil
开始。在它的实现中我们看到这样一行
innerFrom(notifier).subscribe(new OperatorSubscriber(subscriber, () => subscriber.complete(), noop));
本质上说,一旦 notifier
(即作为参数传递给 takeUntil
的 Observable)发出通知,complete
方法就会在 subscriber
.
complete
方法的调用触发了很多事情,但最终它最终调用了Subscription
的方法execTeardown
,最终调用了unsubscribe
的方法30=] 本身调用 unsubscribe
of Subscription
.
正如我们所见,这个链条很长而且很难遵循,但核心信息是 tearDown 逻辑(即当 Observable 完成时调用的逻辑,错误或取消订阅)调用 取消订阅 逻辑。
也许再看一件事很有用,直接实现自定义运算符 from the RxJs documentation。
在这种情况下,在运算符定义的最后,我们找到这段代码
// Return the teardown logic. This will be invoked when
// the result errors, completes, or is unsubscribed.
return () => {
subscription.unsubscribe();
// Clean up our timers.
for (const timerID of allTimerIDs) {
clearTimeout(timerID);
}
};
这是此自定义运算符的拆卸逻辑,此类逻辑会调用 unsubscribe
以及任何其他清理 activity。
从 RxJS documentation 我看到以下示例:
const source = interval(1000);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe(x => console.log(x));
这接近于我的应用程序所需的代码模式,但我发现了一个问题。 takeUntil
运算符订阅,但据我了解,Observer
无法从源 Observable
取消订阅。它无法访问可以调用 unsubscribe()
.
Subscription
对象
因此,如果我理解正确,那么一旦用户单击 source
observable 将继续向 takeUntil
发出滴答声,takeUntil
将消耗它们而不对它们进行任何操作。
我没看错吗?如果是这样,是否有一种普遍接受的方法来杀死 Observer
管道内可观察到的 source
?
takeUntil
的情况如下。
当Observable作为参数传给takeUntil
时,Observable的subscriber
由takeUntil
[=17]返回 =]s,因此,在 pipe
链中创建的所有订阅都以相反的顺序一个接一个地取消订阅。
简而言之,取消订阅是由 RxJs 内部机制在幕后执行的。
要证明此行为,您可以尝试此代码
const source = interval(1000).pipe(
tap({ next: (val) => console.log('source value', val) })
);
const clicks = fromEvent(document, 'click');
const result = source.pipe(takeUntil(clicks));
result.subscribe((x) => console.log(x));
如果你 运行 它,你会看到消息 'source value',val 被打印直到 click
出现。此后,控制台不再打印任何消息,这意味着上游的 Observable,即 interval
函数创建的 Observable 不再通知。
您可以在this stackblitz中尝试上面的代码。
一些内部细节
我们可以看看 RxJs 实现的内部结构,看看这个取消订阅在幕后是如何工作的。
让我们从takeUntil
开始。在它的实现中我们看到这样一行
innerFrom(notifier).subscribe(new OperatorSubscriber(subscriber, () => subscriber.complete(), noop));
本质上说,一旦 notifier
(即作为参数传递给 takeUntil
的 Observable)发出通知,complete
方法就会在 subscriber
.
complete
方法的调用触发了很多事情,但最终它最终调用了Subscription
的方法execTeardown
,最终调用了unsubscribe
的方法30=] 本身调用 unsubscribe
of Subscription
.
正如我们所见,这个链条很长而且很难遵循,但核心信息是 tearDown 逻辑(即当 Observable 完成时调用的逻辑,错误或取消订阅)调用 取消订阅 逻辑。
也许再看一件事很有用,直接实现自定义运算符 from the RxJs documentation。
在这种情况下,在运算符定义的最后,我们找到这段代码
// Return the teardown logic. This will be invoked when
// the result errors, completes, or is unsubscribed.
return () => {
subscription.unsubscribe();
// Clean up our timers.
for (const timerID of allTimerIDs) {
clearTimeout(timerID);
}
};
这是此自定义运算符的拆卸逻辑,此类逻辑会调用 unsubscribe
以及任何其他清理 activity。