接收连接和消息请求并发出消息和连接状态更新的 Websocket 史诗

Websocket epic that receives connection & message requests and emits messages & connection status updates

我希望创建一个 redux-observable 史诗,它可以与我的应用程序的其余部分分开。它需要:

史诗需要监听传入的套接字连接请求,建立套接字连接,然后在连接建立或丢失时输出状态更新。它还需要能够发送和接收消息,然后可以在别处处理这些消息。

我最接近的是 问题中提供的答案:

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action =>
      Observable.webSocket('ws://localhost:8081')
        .map(response => ({ type: 'RECEIVED_MESSAGE', paylod: response }))
    );

但是我不确定如何扩展它以另外发出连接建立和断开事件,以及另外接受要发送到服务器的消息。

一般来说,听起来您想要这样的东西:

(注意,这是未经测试的代码,但应该非常接近可运行)

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action => {
      // Subjects are a combination of an Observer *and* an Observable
      // so webSocket can call openObserver$.next(event) and
      // anyone who is subscribing to openObserver$ will receive it
      // because Subjects are "hot"
      const openObserver$ = new Subject();
      const openObserver$ = new Subject();

      // Listen for our open/close events and transform them
      // to redux actions. We could also include values from
      // the events like event.reason, etc if we wanted
      const open$ = openObserver$.map((event) => ({
        type: 'SOCKET_CONNECTED'
      }));
      const close$ = openObserver$.map((event) => ({
        type: 'SOCKET_DISCONNECTED'
      }));

      // webSocket has an overload signature that accepts this object
      const options = {
        url: 'ws://localhost:8081',
        openObserver: openObserver$,
        closeObserver: openObserver$
      };
      const msg$ = Observable.webSocket(options)
        .map(response => ({ type: 'RECEIVED_MESSAGE', payload: response }))
        .catch(e => Observable.of({
          type: 'SOCKET_ERROR',
          payload: e.message
        }))

      // We're merging them all together because we want to listen for
      // and emit actions from all three. For good measure I also included
      // a generic .takeUntil() to demonstrate the most obvious way to stop
      // the websocket (as well as the open/close, which we shouldn't forget!)
      // Also notice how I'm listening for both the STOP_SOCKET_OR_WHATEVER
      // or also a SOCKET_ERROR because we want to stop subscribing
      // to open$/close$ if there is an error.  
      return Observable.merge(open$, close$, msg$)
        .takeUntil(action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR'));
    });

如果这个史诗需要一次支持多个套接字,您将需要想出某种方法来唯一标识特定连接,并修改代码以基于此过滤信号。例如

.takeUntil(
  action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR')
    .filter(action => action.someHowHaveId === someHowHaveId)
);

对于任何想完全按照我的方式做的人,我的最终代码如下。我最终意识到我真的需要一个用于连接、发出消息的史诗,以及另一个用于发送消息的史诗。

const notificationTypes = {
  WEBSOCKET_TRY_CONNECT: "WEBSOCKET_TRY_CONNECT",
  WEBSOCKET_TRY_DISCONNECT: "WEBSOCKET_TRY_DISCONNECT",
  WEBSOCKET_CONNECTED: "WEBSOCKET_CONNECTED",
  WEBSOCKET_DISCONNECTED: "WEBSOCKET_DISCONNECTED",
  WEBSOCKET_ERROR: "WEBSOCKET_ERROR",
  WEBSOCKET_MESSAGE_SEND: "WEBSOCKET_MESSAGE_SEND",
  WEBSOCKET_MESSAGE_SENT: "WEBSOCKET_MESSAGE_SENT",
  WEBSOCKET_MESSAGE_RECIEVED: "WEBSOCKET_MESSAGE_RECIEVED"
};

const notificationActions = {
  tryConnect: () => ({ type: notificationTypes.WEBSOCKET_TRY_CONNECT }),
  tryDisconnect: () => ({ type: notificationTypes.WEBSOCKET_TRY_DISCONNECT }),
  sendNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_SEND, message }),
  sentNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_SENT, message }),
  receivedNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_RECIEVED, message }),
  connected: () => ({ type: notificationTypes.WEBSOCKET_CONNECTED }),
  disconnected: () => ({ type: notificationTypes.WEBSOCKET_DISCONNECTED }),
  error: error => ({ type: notificationTypes.WEBSOCKET_ERROR, error })
};

let webSocket$ = null;

const notificationSendEpic = (action$, state$) =>
  action$.pipe(
    ofType(notificationTypes.WEBSOCKET_MESSAGE_SEND),
    mergeMap(action => {
        if (!webSocket$) {
          return of(notificationActions.error(`Attempted to send message while no connection was open.`));
        }
        webSocket$.next(action.message);
        return of(notificationActions.sentNotification(action.message));
    })
  );

const notificationConnectionEpic = (action$, state$) =>
  action$.pipe(
    ofType(notificationTypes.WEBSOCKET_TRY_CONNECT),
    switchMap(action => {

      if (webSocket$) {
        return of(notificationActions.error(`Attempted to open connection when one was already open.`));
      }

      const webSocketOpen$ = new Subject();
      const webSocketClose$ = new Subject();

      const open$ = webSocketOpen$.pipe(take(1),map(() => of(notificationActions.connected())));
      const close$ = webSocketClose$.pipe(take(1),map(() => {
        webSocket$ = null;
        return of(notificationActions.disconnected());
      }));

      webSocket$ = webSocket({
        url: wsLocation,
        openObserver: webSocketOpen$,
        closeObserver: webSocketClose$
      });

      const message$ = webSocket$.pipe(
        takeUntil(action$.ofType(notificationTypes.WEBSOCKET_DISCONNECTED, notificationTypes.WEBSOCKET_TRY_DISCONNECT)),
        map(evt => of(notificationActions.receivedNotification(evt)))
      );

      return merge(message$, open$, close$);

    }),
    mergeMap(v => v)
  );