RxJS:你怎么知道使用时需要清理什么Observable.create

RxJS: how do you know what needs to be cleaned up when using Observable.create

我试图在创建自定义 Observable 后更好地理解清理。为了避免内存泄漏,您应该使用一次性功能进行清理。在这种情况下,我使用了计时器的 Rx 方法而不是 setInterval 并且我相信当我调用 observer.onCompleted() 时会在内部清理计时器并且我的一次性用品是无用的。我对这个过程的理解是否正确?

function countInterval(interval, times) {
  return Rx.Observable.create(function (observer) {  
    var timerId = Rx.Observable.timer(0, interval)
      .subscribe(count => {
        if (count>=times) {
          console.log('')
          observer.onNext(true);
          // I believe this already cleans up internal
          // references to setTimeout but I am not sure
          observer.onCompleted();
        }
      });

    // creating a disposable
    return () => {
      console.log('dispose')
      timerId.dispose();
    }
  });

}

您可以在这里随意玩这个例子: https://jsbin.com/vojiduzopo/edit?js,console,output 还有一个版本,我使用 setInterval 调用 'clearInterval' 作为一次性的。

当您使用 Rx.Observable.Create 创建自己的 Observable 时,您负责清理内存(如果需要)。在您的示例中,您创建了一个计时器,如果不清理它会导致内存泄漏。

流完成后,将调用流中每个 Observable 的 dispose() 函数。手动调用 dispose() 表示您不再对来自上游的结果感兴趣,并且还会导致 dispose() 链接到上游,除非被 share() 之类的东西阻止。

这是它的工作原理。

  1. 您将函数传递给 Observable.create。只要您正在创建的可观察对象被订阅,就会调用该函数。
  2. 您的函数接收一个 Subscriber 公开 nexterrorcomplete。你的工作是以正确的顺序调用这三个函数,从下游观察者的角度构建看起来像值序列的东西。
  3. 为了对您收到的 Subscriber 进行正确的调用,您可能会获取资源,例如订阅您正在使用的更基本的可观察对象。我们将调用这些上游可观察量。 (记住:订阅需要资源,特别是内存)。您在创建函数主体中进行的订阅称为内部订阅。
  4. 您的创建函数 returns 一个将成为您的处置方法的函数。当您创建的 observable 被释放时,它将被调用,或者通过调用它的 complete() 方法,它的错误输出,或者订阅者手动调用 dispose.

您对这个过程的理解不正确。当您向下游观察者发出完成信号时,您下游的事物将被清理,然后您的 dispose 方法将被调用,以便您可以清理自己。你确实有一些清理,因为你在计时器上进行的内部订阅不会因为下游被处理而被清理。在您自己处理之前,该计时器仍在滴答作响。它不知道它下游的任何事情,因为你是将它连接到下游观察者的人。您在底部处理定时器子的代码是避免泄漏所必需的。