如何使用 ws ( Websockets ) 和 RxJs 在 NestJS 中连接一个长 运行 进程

How to hook up a Long running Process in NestJS with ws ( Websockets ) and RxJs

我想 运行 在函数中编码 - 然后 return 作为 websocket Observable。有效地监控一个漫长的 运行ning 过程。我不知道如何通过 websockets 以这种格式 return 正确地获取值。

我漫长的运行宁过程:(显然不会实际花费很长时间)

import { Observable } from 'rxjs';

export function longRunningProcess (): Observable<unknown> {

    return new Observable(subscriber => {
        subscriber.next('End of step 1');
        subscriber.next('End of step 2');
        subscriber.next('End of step 3');
        setTimeout(() => {
            subscriber.next('End of Step 4');
            subscriber.complete();
        }, 1000);
    });
}

我的 NestJS 端点 return 到 ws ( Websocket )

import { WsAdapter } from '@nestjs/platform-ws';
import {
    MessageBody,
    SubscribeMessage,
    WebSocketGateway,
    WebSocketServer,
    WsResponse,
} from '@nestjs/websockets';
import { from, Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { Server } from 'ws';
import { longRunningProcess } from './test'

@WebSocketGateway()
export class EventsGateway {
    @WebSocketServer()
    server: Server;

    @SubscribeMessage('events')
    // send {"event":"events","data":"test"} in websockets
    findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
        return from(longRunningProcess) // Not really sure how to return this 
        //return from([1, 2, 3]).pipe(map(item => ({ event: 'events', data: item })));  //<< this works from the sample
    }

    @SubscribeMessage('identity')
    async identity (@MessageBody() data: number): Promise<number> {
        return data;
    }
}

只是 map 你从 longRunningProcess 得到的结果,就像你对数字数组所做的那样。

@SubscribeMessage('events')
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
    return longRunningProcess().pipe(map(item => ({ event: 'events', data: item })));
}