Angular2/Websocket:如何 return 对传入的 websocket 消息进行观察

Angular2/Websocket: how to return an observable for incoming websocket messages

我将使用 Angular2 接收 websocket 传入消息并根据接收到的消息更新网页。现在,我正在使用虚拟 echo websocket 服务并将替换它。

根据我的理解,接收 websocket 消息的函数必须 return 一个由将更新网页的处理程序订阅的可观察对象。但是我不知道如何 return observable。

下面附有代码片段。 MonitorService 创建一个 websocket 连接和 return 一个包含接收到的消息的可观察对象。

@Injectable()
export class MonitorService {

    private actionUrl: string;
    private headers: Headers;
    private websocket: any;
    private receivedMsg: any;
    constructor(private http: Http, private configuration: AppConfiguration) {

        this.actionUrl = configuration.BaseUrl + 'monitor/';
        this.headers = new Headers();
        this.headers.append('Content-Type', 'application/json');
        this.headers.append('Accept', 'application/json');
    }

    public GetInstanceStatus = (): Observable<Response> => {
        this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
        this.websocket.onopen =  (evt) => {
            this.websocket.send("Hello World");
        };

        this.websocket.onmessage = (evt) => { 
            this.receivedMsg = evt;
        };

        return new Observable(this.receivedMsg).share();
    }

}

下面是另一个组件,它订阅上面的可观察 returned 并相应地更新网页。

export class InstanceListComponent {
  private instanceStatus: boolean
  private instanceName: string
  private instanceIcon: string
  constructor(private monitor: MonitorService) { 
    this.monitor.GetInstanceStatus().subscribe((result) => {
        this.setInstanceProperties(result);
    });
  }

  setInstanceProperties(res:any) {
    this.instanceName = res.Instance.toUpperCase();
    this.instanceStatus = res.Status;
    if (res.Status == true)
    {
      this.instanceIcon = "images/icon/healthy.svg#Layer_1";
    } else {
      this.instanceIcon = "images/icon/cancel.svg#cancel";
    }
  }
}

现在,我 运行 在浏览器控制台中遇到了这个错误 TypeError: this._subscribe is not a function

我把它放在 plunker 上,并添加了一个向 Websocket 端点发送消息的函数。这是重要的编辑:

public GetInstanceStatus(): Observable<any>{
    this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
    this.websocket.onopen =  (evt) => {
        this.websocket.send("Hello World");
    };
    return Observable.create(observer=>{
        this.websocket.onmessage = (evt) => { 
            observer.next(evt);
        };
    })
    .share();
}

更新
正如您在评论中提到的,更好的替代方法是使用 Observable.fromEvent()

websocket = new WebSocket("ws://echo.websocket.org/");
public GetInstanceStatus(): Observable<Event>{
    return Observable.fromEvent(this.websocket,'message');
}

plunker example 对于 Observable.fromEvent();

此外,您可以使用 WebSocketSubject 来完成它,尽管它看起来还没有准备好(从 rc.4 开始):

constructor(){
  this.websocket = WebSocketSubject.create("ws://echo.websocket.org/");
}

public sendMessage(text:string){
  let msg = {msg:text};
  this.websocket.next(JSON.stringify(msg));
} 

plunker example

从套接字获取 onMessage 数据。

import { Injectable } from '@angular/core';
import {Observable} from 'rxjs/Rx';


@Injectable()

export class HpmaDashboardService {
private socketUrl: any = 'ws://127.0.0.0/util/test/dataserver/ws';
private websocket: any;      

public GetAllInstanceStatus(objStr): Observable<any> {
      this.websocket = new WebSocket(this.socketUrl);
      this.websocket.onopen =  (evt) => {
        this.websocket.send(JSON.stringify(objStr));
      };
      return Observable.create(observer => {
        this.websocket.onmessage = (evt) => {
          observer.next(evt);
        };
      }).map(res => res.data).share();
 }

**Get only single mesage from socket.**

    public GetSingleInstanceStatus(objStr): Observable<any> {
      this.websocket = new WebSocket(this.socketUrl);
      this.websocket.onopen =  (evt) => {
        this.websocket.send(JSON.stringify(objStr));
      };
      return Observable.create(observer => {
        this.websocket.onmessage = (evt) => {
          observer.next(evt);
          this.websocket.close();
        };
      }).map(res => res.data).share();
  }

}

我使用的另一种方法是 subject:

export class WebSocketClient {
   private client: WebSocket | undefined;
   private subject = new Subject<string>();

   ...
   private connect() {
      const client = new WebSocket(fakeUrl);
      const client.onmessage = (event) => {
          this.subject.next(event.data);
      };
   }

   private watch() { return this.subject } // can be mapped
}

我认为使用它会更清楚:

const client = new WebSocketClient(); // can also be injected
client.connect();
client.watch().subscribe(x => ...);

编码愉快!