待处理请求 | NestJS WebSocket 观察者

Pending Request | NestJS WebSocket Observer

我正在开发 NestJS 应用程序,它向 Binance Websocket API 请求一些数据。我在 console.log 中获得了所有数据,但在 Postman 或浏览器中 Window 有一个请求待处理。我不明白怎么了。如何在 Postman 或 Browser Window 中获取数据?请帮忙。

更新

我开发了一个新的 WebSocket 服务器,可以将数据从 Binance 的 WS 发送到 UI。在 UI 方面,我只得到了第一项:|怎么了?

Coin.gateway.ts

import { MessageBody, SubscribeMessage, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
import { from, of, take, map, Observable } from 'rxjs';
import { Coin } from './classes/coin';
import * as coinlist from './list/coins.json'


@WebSocketGateway(811, {transports: ['websocket', 'polling'], cors: true})
export class CoinGateway {

  @WebSocketServer()
  server: Server;

  @SubscribeMessage('events')
  handleMessage(@MessageBody() data: any) {
    console.log('data',data)
    const coins = new Coin(coinlist, 'usdt', 'miniTicker')
    return coins.getCryptoData().pipe(map((c) => {
      return c
    }))
  }
}

Coin.ts

import { GetCryptocurrencies } from "./abstract/get-cryptocurrencies";
import { WebSocket } from "ws";
import { Logger } from "@nestjs/common";
import { Observable } from "rxjs";

export class Coin extends GetCryptocurrencies {
    private readonly logger = new Logger(Coin.name)
    private baseUrl: string
    private url: string
    constructor(coin: { name: string, symbol: string }[], pair: string, method: string) {
        super(coin, pair, method)
        this.baseUrl = 'wss://stream.binance.com:9443/stream?streams='
        this.url = coin.map((c) => {
            return `${c.symbol.toLowerCase()}${pair}@${method}`
        }).join('/')
        }

    getCryptoData(): any {
        const stream$ = new Observable((observer) => {

            const ws = new WebSocket(`${this.baseUrl}${this.url}`)
            ws.on('open', () => {
                this.logger.log('Connection established')
            })
            ws.onmessage = (msg: any) => {
                const message = JSON.parse(msg.data)
                observer.next(message)
            }
            ws.on('close', () => {
                this.logger.log('Connection closed')
            })
        })
        return stream$
    }
}

客户端UI使用Effect hook

useEffect(() => {
    const socket = io('ws://localhost:811', {transports: ['websocket']})
    socket.on('connect', () => {
        console.log('Connection established from client')

        socket.emit('events', '', (res: any) => {
            console.log(res)
        })
        const engine = socket.io.engine;
        console.log(engine.transport.name); // in most cases, prints "polling"

        engine.once("upgrade", () => {
            // called when the transport is upgraded (i.e. from HTTP long-polling to WebSocket)
            console.log(engine.transport.name); // in most cases, prints "websocket"
        });

        engine.on("packetCreate", ({ type, data }) => {
            // called for each packet sent
            console.log('Stype', type)
            console.log('Sdata', data)
        });

    })
    }, [])

发生这种情况是因为您保持 Observable(以及隐含的底层 WebSocket)处于活动状态,这意味着 HTTP GET 调用永远不会完成。

您可以通过确保 Observable 在第一个数据来自 WS 通道后完成来解决它。所以在管道中添加 take(1),像这样:

import { take } from 'rxjs/operators';

......................................

@Injectable()
export class GetDataService {
    getCoins(): Observable<any[]> {
        const coins = new Coin(coinlist, 'usdt', 'miniTicker')
        return coins.getCryptoData().pipe(
        take(1), // <---- I've added this
        map((c) => {
            console.log(c)
            return c
        }))
    }
}

它应该可以解决您的问题。