即使没有活跃订阅者,我的 Observable 是否会永远使用 shareReplay() 运行?

Will my Observable using shareReplay() run forever, even without active subscribers?

考虑这个 Observable:

myObs$ = interval(1000).pipe(shareReplay(2));

及其用法:

myObs$.subscribe(res => console.log('[Subscriber 1]:' + res));

如果我订阅了,然后过几秒退订,再过几秒又重新订阅,好像间隔保持运行并且还在计数:

[Subscriber 1]: 0
[Subscriber 1]: 1 
Unsubscribe, resubscribe here
[Subscriber 1]: 3
[Subscriber 1]: 4
[Subscriber 1]: 5
[Subscriber 1]: 6 
Unsubscribe, resubscribe here
[Subscriber 1]: 10
[Subscriber 1]: 11
[Subscriber 1]: 12

我了解到使用 refCount = true 时不会发生这种情况。 但是当它为 false 时,这是否算作潜在的内存泄漏?如果不是,我该如何阻止它?

此外,为什么取消订阅后需要重新创建我的订阅?

sub = new Subscription()
sub.add(myObs.subscribe())
sub.unsubscribe()
sub.add(myObs.subscribe()) // <-- this does not work unless I recreate a new Subscription()

在没有 refCount 的情况下使用 shareReply 会导致内存泄漏,因为操作员不会在每个人都取消订阅后自动关闭流。您可以使用接受另一个流的 takeUntil 运算符来完成一个长期存在的流,它在与 Subject.

一起使用时非常有用
const sub = new Subject();

const stream$ = interval(1000).pipe(
  takeUntil(sub),
  shareReplay(1)
);

setTimeout(() => stream$.subscribe(console.log), 4000);
setTimeout(() => sub.next(), 6000);
setTimeout(() => stream$.subscribe(undefined, undefined, () => console.log('completed')), 8000); // this will log completed because the stream$ has successfully finished

另一种完成流的方法是使用 takeWhile 这也非常方便,它使用谓词函数。

如果您处置一个订阅,它会被标记为关闭,您需要创建一个新订阅,它不能再次使用。

查看 this line 添加方法内部的检查,如果 closed 为真,则对提供的订阅执行拆卸。