如何使用 ws ( Websockets ) 和 RxJs 在 NestJS 中连接一个长 运行 进程
How to hook up a Long running Process in NestJS with ws ( Websockets ) and RxJs
我想 运行 在函数中编码 - 然后 return 作为 websocket Observable。有效地监控一个漫长的 运行ning 过程。我不知道如何通过 websockets 以这种格式 return 正确地获取值。
我漫长的运行宁过程:(显然不会实际花费很长时间)
import { Observable } from 'rxjs';
export function longRunningProcess (): Observable<unknown> {
return new Observable(subscriber => {
subscriber.next('End of step 1');
subscriber.next('End of step 2');
subscriber.next('End of step 3');
setTimeout(() => {
subscriber.next('End of Step 4');
subscriber.complete();
}, 1000);
});
}
我的 NestJS 端点 return 到 ws ( Websocket )
import { WsAdapter } from '@nestjs/platform-ws';
import {
MessageBody,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
WsResponse,
} from '@nestjs/websockets';
import { from, Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { Server } from 'ws';
import { longRunningProcess } from './test'
@WebSocketGateway()
export class EventsGateway {
@WebSocketServer()
server: Server;
@SubscribeMessage('events')
// send {"event":"events","data":"test"} in websockets
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
return from(longRunningProcess) // Not really sure how to return this
//return from([1, 2, 3]).pipe(map(item => ({ event: 'events', data: item }))); //<< this works from the sample
}
@SubscribeMessage('identity')
async identity (@MessageBody() data: number): Promise<number> {
return data;
}
}
只是 map
你从 longRunningProcess
得到的结果,就像你对数字数组所做的那样。
@SubscribeMessage('events')
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
return longRunningProcess().pipe(map(item => ({ event: 'events', data: item })));
}
我想 运行 在函数中编码 - 然后 return 作为 websocket Observable。有效地监控一个漫长的 运行ning 过程。我不知道如何通过 websockets 以这种格式 return 正确地获取值。
我漫长的运行宁过程:(显然不会实际花费很长时间)
import { Observable } from 'rxjs';
export function longRunningProcess (): Observable<unknown> {
return new Observable(subscriber => {
subscriber.next('End of step 1');
subscriber.next('End of step 2');
subscriber.next('End of step 3');
setTimeout(() => {
subscriber.next('End of Step 4');
subscriber.complete();
}, 1000);
});
}
我的 NestJS 端点 return 到 ws ( Websocket )
import { WsAdapter } from '@nestjs/platform-ws';
import {
MessageBody,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
WsResponse,
} from '@nestjs/websockets';
import { from, Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { Server } from 'ws';
import { longRunningProcess } from './test'
@WebSocketGateway()
export class EventsGateway {
@WebSocketServer()
server: Server;
@SubscribeMessage('events')
// send {"event":"events","data":"test"} in websockets
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
return from(longRunningProcess) // Not really sure how to return this
//return from([1, 2, 3]).pipe(map(item => ({ event: 'events', data: item }))); //<< this works from the sample
}
@SubscribeMessage('identity')
async identity (@MessageBody() data: number): Promise<number> {
return data;
}
}
只是 map
你从 longRunningProcess
得到的结果,就像你对数字数组所做的那样。
@SubscribeMessage('events')
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
return longRunningProcess().pipe(map(item => ({ event: 'events', data: item })));
}