Angular2/Websocket:如何 return 对传入的 websocket 消息进行观察
Angular2/Websocket: how to return an observable for incoming websocket messages
我将使用 Angular2 接收 websocket 传入消息并根据接收到的消息更新网页。现在,我正在使用虚拟 echo websocket 服务并将替换它。
根据我的理解,接收 websocket 消息的函数必须 return 一个由将更新网页的处理程序订阅的可观察对象。但是我不知道如何 return observable。
下面附有代码片段。 MonitorService
创建一个 websocket 连接和 return 一个包含接收到的消息的可观察对象。
@Injectable()
export class MonitorService {
private actionUrl: string;
private headers: Headers;
private websocket: any;
private receivedMsg: any;
constructor(private http: Http, private configuration: AppConfiguration) {
this.actionUrl = configuration.BaseUrl + 'monitor/';
this.headers = new Headers();
this.headers.append('Content-Type', 'application/json');
this.headers.append('Accept', 'application/json');
}
public GetInstanceStatus = (): Observable<Response> => {
this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
this.websocket.onopen = (evt) => {
this.websocket.send("Hello World");
};
this.websocket.onmessage = (evt) => {
this.receivedMsg = evt;
};
return new Observable(this.receivedMsg).share();
}
}
下面是另一个组件,它订阅上面的可观察 returned 并相应地更新网页。
export class InstanceListComponent {
private instanceStatus: boolean
private instanceName: string
private instanceIcon: string
constructor(private monitor: MonitorService) {
this.monitor.GetInstanceStatus().subscribe((result) => {
this.setInstanceProperties(result);
});
}
setInstanceProperties(res:any) {
this.instanceName = res.Instance.toUpperCase();
this.instanceStatus = res.Status;
if (res.Status == true)
{
this.instanceIcon = "images/icon/healthy.svg#Layer_1";
} else {
this.instanceIcon = "images/icon/cancel.svg#cancel";
}
}
}
现在,我 运行 在浏览器控制台中遇到了这个错误
TypeError: this._subscribe is not a function
我把它放在 plunker 上,并添加了一个向 Websocket 端点发送消息的函数。这是重要的编辑:
public GetInstanceStatus(): Observable<any>{
this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
this.websocket.onopen = (evt) => {
this.websocket.send("Hello World");
};
return Observable.create(observer=>{
this.websocket.onmessage = (evt) => {
observer.next(evt);
};
})
.share();
}
更新
正如您在评论中提到的,更好的替代方法是使用 Observable.fromEvent()
websocket = new WebSocket("ws://echo.websocket.org/");
public GetInstanceStatus(): Observable<Event>{
return Observable.fromEvent(this.websocket,'message');
}
plunker example 对于 Observable.fromEvent()
;
此外,您可以使用 WebSocketSubject
来完成它,尽管它看起来还没有准备好(从 rc.4 开始):
constructor(){
this.websocket = WebSocketSubject.create("ws://echo.websocket.org/");
}
public sendMessage(text:string){
let msg = {msg:text};
this.websocket.next(JSON.stringify(msg));
}
从套接字获取 onMessage 数据。
import { Injectable } from '@angular/core';
import {Observable} from 'rxjs/Rx';
@Injectable()
export class HpmaDashboardService {
private socketUrl: any = 'ws://127.0.0.0/util/test/dataserver/ws';
private websocket: any;
public GetAllInstanceStatus(objStr): Observable<any> {
this.websocket = new WebSocket(this.socketUrl);
this.websocket.onopen = (evt) => {
this.websocket.send(JSON.stringify(objStr));
};
return Observable.create(observer => {
this.websocket.onmessage = (evt) => {
observer.next(evt);
};
}).map(res => res.data).share();
}
**Get only single mesage from socket.**
public GetSingleInstanceStatus(objStr): Observable<any> {
this.websocket = new WebSocket(this.socketUrl);
this.websocket.onopen = (evt) => {
this.websocket.send(JSON.stringify(objStr));
};
return Observable.create(observer => {
this.websocket.onmessage = (evt) => {
observer.next(evt);
this.websocket.close();
};
}).map(res => res.data).share();
}
}
我使用的另一种方法是 subject
:
export class WebSocketClient {
private client: WebSocket | undefined;
private subject = new Subject<string>();
...
private connect() {
const client = new WebSocket(fakeUrl);
const client.onmessage = (event) => {
this.subject.next(event.data);
};
}
private watch() { return this.subject } // can be mapped
}
我认为使用它会更清楚:
const client = new WebSocketClient(); // can also be injected
client.connect();
client.watch().subscribe(x => ...);
编码愉快!
我将使用 Angular2 接收 websocket 传入消息并根据接收到的消息更新网页。现在,我正在使用虚拟 echo websocket 服务并将替换它。
根据我的理解,接收 websocket 消息的函数必须 return 一个由将更新网页的处理程序订阅的可观察对象。但是我不知道如何 return observable。
下面附有代码片段。 MonitorService
创建一个 websocket 连接和 return 一个包含接收到的消息的可观察对象。
@Injectable()
export class MonitorService {
private actionUrl: string;
private headers: Headers;
private websocket: any;
private receivedMsg: any;
constructor(private http: Http, private configuration: AppConfiguration) {
this.actionUrl = configuration.BaseUrl + 'monitor/';
this.headers = new Headers();
this.headers.append('Content-Type', 'application/json');
this.headers.append('Accept', 'application/json');
}
public GetInstanceStatus = (): Observable<Response> => {
this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
this.websocket.onopen = (evt) => {
this.websocket.send("Hello World");
};
this.websocket.onmessage = (evt) => {
this.receivedMsg = evt;
};
return new Observable(this.receivedMsg).share();
}
}
下面是另一个组件,它订阅上面的可观察 returned 并相应地更新网页。
export class InstanceListComponent {
private instanceStatus: boolean
private instanceName: string
private instanceIcon: string
constructor(private monitor: MonitorService) {
this.monitor.GetInstanceStatus().subscribe((result) => {
this.setInstanceProperties(result);
});
}
setInstanceProperties(res:any) {
this.instanceName = res.Instance.toUpperCase();
this.instanceStatus = res.Status;
if (res.Status == true)
{
this.instanceIcon = "images/icon/healthy.svg#Layer_1";
} else {
this.instanceIcon = "images/icon/cancel.svg#cancel";
}
}
}
现在,我 运行 在浏览器控制台中遇到了这个错误
TypeError: this._subscribe is not a function
我把它放在 plunker 上,并添加了一个向 Websocket 端点发送消息的函数。这是重要的编辑:
public GetInstanceStatus(): Observable<any>{
this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
this.websocket.onopen = (evt) => {
this.websocket.send("Hello World");
};
return Observable.create(observer=>{
this.websocket.onmessage = (evt) => {
observer.next(evt);
};
})
.share();
}
更新
正如您在评论中提到的,更好的替代方法是使用 Observable.fromEvent()
websocket = new WebSocket("ws://echo.websocket.org/");
public GetInstanceStatus(): Observable<Event>{
return Observable.fromEvent(this.websocket,'message');
}
plunker example 对于 Observable.fromEvent()
;
此外,您可以使用 WebSocketSubject
来完成它,尽管它看起来还没有准备好(从 rc.4 开始):
constructor(){
this.websocket = WebSocketSubject.create("ws://echo.websocket.org/");
}
public sendMessage(text:string){
let msg = {msg:text};
this.websocket.next(JSON.stringify(msg));
}
从套接字获取 onMessage 数据。
import { Injectable } from '@angular/core';
import {Observable} from 'rxjs/Rx';
@Injectable()
export class HpmaDashboardService {
private socketUrl: any = 'ws://127.0.0.0/util/test/dataserver/ws';
private websocket: any;
public GetAllInstanceStatus(objStr): Observable<any> {
this.websocket = new WebSocket(this.socketUrl);
this.websocket.onopen = (evt) => {
this.websocket.send(JSON.stringify(objStr));
};
return Observable.create(observer => {
this.websocket.onmessage = (evt) => {
observer.next(evt);
};
}).map(res => res.data).share();
}
**Get only single mesage from socket.**
public GetSingleInstanceStatus(objStr): Observable<any> {
this.websocket = new WebSocket(this.socketUrl);
this.websocket.onopen = (evt) => {
this.websocket.send(JSON.stringify(objStr));
};
return Observable.create(observer => {
this.websocket.onmessage = (evt) => {
observer.next(evt);
this.websocket.close();
};
}).map(res => res.data).share();
}
}
我使用的另一种方法是 subject
:
export class WebSocketClient {
private client: WebSocket | undefined;
private subject = new Subject<string>();
...
private connect() {
const client = new WebSocket(fakeUrl);
const client.onmessage = (event) => {
this.subject.next(event.data);
};
}
private watch() { return this.subject } // can be mapped
}
我认为使用它会更清楚:
const client = new WebSocketClient(); // can also be injected
client.connect();
client.watch().subscribe(x => ...);
编码愉快!