可观察对象丢失的听众问题

Issue with the lost listener of the observable

我对服务中可观察主题的丢失侦听器有一些疑问。当应用程序初始化一段时间后它工作正常 - 它响应来自网络套接字连接的消息,执行这些消息的处理程序。但是由于某些未知原因,在某个时间点,可观察主题的侦听器从订阅者列表中消失了。

我尝试调试这个问题,发现当收到来自网络套接字的消息时,我的 Observable 的 messages$ 流订阅者列表是空的。

app.component.ts:

import { Component, OnInit } from "@angular/core";
import { filter, takeUntil } from "rxjs/operators";
import { SocketService } from "./services/socket.service";
import { NgOnDestroy } from "./utils/ng-on-destroy";

@Component({
  selector: "my-app",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"],
  providers: [NgOnDestroy]
})
export class AppComponent implements OnInit {
  constructor(
    private ngOnDestroy: NgOnDestroy,
    private socketService: SocketService
  ) {}

  ngOnInit() {
    this.socketService.messages$
      .pipe(
        takeUntil(this.ngOnDestroy),
        filter((data: any) => !!data.method)
      )
      .subscribe((data: any) => {
        // ... some code
      });
  }
}

socket.service.ts:

import { Subject } from "rxjs";
import { Injectable } from "@angular/core";

@Injectable({
  providedIn: 'root'
})
export class SocketService {

  private ws: WebSocket | null = null;

  open$ = new Subject();

  messages$ = new Subject();

  constructor() {
  }

  public send(data: any): void {
    this.ws?.send(
      JSON.stringify(data)
    );
  }

  public connect(): void {
    try {
      this.ws = new WebSocket(`wss://${window.location.host}/notifications`);

      this.ws.addEventListener('open', _ => {
        this.open$.next();
      });

      this.ws.addEventListener('message', (message: MessageEvent) => {
        this.messages$.next(
          JSON.parse(message.data)
        );
      });

    } catch (e) {
      console.log('WEBSOCKET CONNECTION ERROR!!!');
    }
  }

  public close(): void {
    this.ws?.close();
  }
}

ng-on-destroy.ts:

import { Injectable, OnDestroy } from "@angular/core";
import { Subject } from "rxjs";

@Injectable()
export class NgOnDestroy extends Subject<null> implements OnDestroy {
  ngOnDestroy() {
    this.next(null);
    this.complete();
  }
}

我不明白为什么会发生这种情况,因为我的代码中除了 NgOnDestroy class 之外没有任何明确的取消订阅。虽然我真的不知道,但我不相信取消订阅会发生在其中,假设 AppComponent 永远不会被销毁。你能帮我解决这个问题吗?

一段时间后,我弄清楚订阅对象究竟发生了什么:消息处理程序执行期间发生异常,导致订阅关闭。