接收连接和消息请求并发出消息和连接状态更新的 Websocket 史诗
Websocket epic that receives connection & message requests and emits messages & connection status updates
我希望创建一个 redux-observable 史诗,它可以与我的应用程序的其余部分分开。它需要:
- 侦听
{ type: "SOCKET_TRY_CONNECT" }
的传入操作,这也可能会在连接时忽略任何其他 SOCKET_TRY_CONNECT 事件。另外监听要发送的消息,可能 { type: "SOCKET_MESSAGE_SEND", data }
- 发出传出动作
{ type: "SOCKET_CONNECTED" }
、{ type: "SOCKET_DISCONNECTED", error }
和 { type: "SOCKET_MESSAGE_RECEIVE", data }
史诗需要监听传入的套接字连接请求,建立套接字连接,然后在连接建立或丢失时输出状态更新。它还需要能够发送和接收消息,然后可以在别处处理这些消息。
我最接近的是 问题中提供的答案:
const somethingEpic = action$ =>
action$.ofType('START_SOCKET_OR_WHATEVER')
.switchMap(action =>
Observable.webSocket('ws://localhost:8081')
.map(response => ({ type: 'RECEIVED_MESSAGE', paylod: response }))
);
但是我不确定如何扩展它以另外发出连接建立和断开事件,以及另外接受要发送到服务器的消息。
一般来说,听起来您想要这样的东西:
(注意,这是未经测试的代码,但应该非常接近可运行)
const somethingEpic = action$ =>
action$.ofType('START_SOCKET_OR_WHATEVER')
.switchMap(action => {
// Subjects are a combination of an Observer *and* an Observable
// so webSocket can call openObserver$.next(event) and
// anyone who is subscribing to openObserver$ will receive it
// because Subjects are "hot"
const openObserver$ = new Subject();
const openObserver$ = new Subject();
// Listen for our open/close events and transform them
// to redux actions. We could also include values from
// the events like event.reason, etc if we wanted
const open$ = openObserver$.map((event) => ({
type: 'SOCKET_CONNECTED'
}));
const close$ = openObserver$.map((event) => ({
type: 'SOCKET_DISCONNECTED'
}));
// webSocket has an overload signature that accepts this object
const options = {
url: 'ws://localhost:8081',
openObserver: openObserver$,
closeObserver: openObserver$
};
const msg$ = Observable.webSocket(options)
.map(response => ({ type: 'RECEIVED_MESSAGE', payload: response }))
.catch(e => Observable.of({
type: 'SOCKET_ERROR',
payload: e.message
}))
// We're merging them all together because we want to listen for
// and emit actions from all three. For good measure I also included
// a generic .takeUntil() to demonstrate the most obvious way to stop
// the websocket (as well as the open/close, which we shouldn't forget!)
// Also notice how I'm listening for both the STOP_SOCKET_OR_WHATEVER
// or also a SOCKET_ERROR because we want to stop subscribing
// to open$/close$ if there is an error.
return Observable.merge(open$, close$, msg$)
.takeUntil(action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR'));
});
如果这个史诗需要一次支持多个套接字,您将需要想出某种方法来唯一标识特定连接,并修改代码以基于此过滤信号。例如
.takeUntil(
action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR')
.filter(action => action.someHowHaveId === someHowHaveId)
);
对于任何想完全按照我的方式做的人,我的最终代码如下。我最终意识到我真的需要一个用于连接、发出消息的史诗,以及另一个用于发送消息的史诗。
const notificationTypes = {
WEBSOCKET_TRY_CONNECT: "WEBSOCKET_TRY_CONNECT",
WEBSOCKET_TRY_DISCONNECT: "WEBSOCKET_TRY_DISCONNECT",
WEBSOCKET_CONNECTED: "WEBSOCKET_CONNECTED",
WEBSOCKET_DISCONNECTED: "WEBSOCKET_DISCONNECTED",
WEBSOCKET_ERROR: "WEBSOCKET_ERROR",
WEBSOCKET_MESSAGE_SEND: "WEBSOCKET_MESSAGE_SEND",
WEBSOCKET_MESSAGE_SENT: "WEBSOCKET_MESSAGE_SENT",
WEBSOCKET_MESSAGE_RECIEVED: "WEBSOCKET_MESSAGE_RECIEVED"
};
const notificationActions = {
tryConnect: () => ({ type: notificationTypes.WEBSOCKET_TRY_CONNECT }),
tryDisconnect: () => ({ type: notificationTypes.WEBSOCKET_TRY_DISCONNECT }),
sendNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_SEND, message }),
sentNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_SENT, message }),
receivedNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_RECIEVED, message }),
connected: () => ({ type: notificationTypes.WEBSOCKET_CONNECTED }),
disconnected: () => ({ type: notificationTypes.WEBSOCKET_DISCONNECTED }),
error: error => ({ type: notificationTypes.WEBSOCKET_ERROR, error })
};
let webSocket$ = null;
const notificationSendEpic = (action$, state$) =>
action$.pipe(
ofType(notificationTypes.WEBSOCKET_MESSAGE_SEND),
mergeMap(action => {
if (!webSocket$) {
return of(notificationActions.error(`Attempted to send message while no connection was open.`));
}
webSocket$.next(action.message);
return of(notificationActions.sentNotification(action.message));
})
);
const notificationConnectionEpic = (action$, state$) =>
action$.pipe(
ofType(notificationTypes.WEBSOCKET_TRY_CONNECT),
switchMap(action => {
if (webSocket$) {
return of(notificationActions.error(`Attempted to open connection when one was already open.`));
}
const webSocketOpen$ = new Subject();
const webSocketClose$ = new Subject();
const open$ = webSocketOpen$.pipe(take(1),map(() => of(notificationActions.connected())));
const close$ = webSocketClose$.pipe(take(1),map(() => {
webSocket$ = null;
return of(notificationActions.disconnected());
}));
webSocket$ = webSocket({
url: wsLocation,
openObserver: webSocketOpen$,
closeObserver: webSocketClose$
});
const message$ = webSocket$.pipe(
takeUntil(action$.ofType(notificationTypes.WEBSOCKET_DISCONNECTED, notificationTypes.WEBSOCKET_TRY_DISCONNECT)),
map(evt => of(notificationActions.receivedNotification(evt)))
);
return merge(message$, open$, close$);
}),
mergeMap(v => v)
);
我希望创建一个 redux-observable 史诗,它可以与我的应用程序的其余部分分开。它需要:
- 侦听
{ type: "SOCKET_TRY_CONNECT" }
的传入操作,这也可能会在连接时忽略任何其他 SOCKET_TRY_CONNECT 事件。另外监听要发送的消息,可能{ type: "SOCKET_MESSAGE_SEND", data }
- 发出传出动作
{ type: "SOCKET_CONNECTED" }
、{ type: "SOCKET_DISCONNECTED", error }
和{ type: "SOCKET_MESSAGE_RECEIVE", data }
史诗需要监听传入的套接字连接请求,建立套接字连接,然后在连接建立或丢失时输出状态更新。它还需要能够发送和接收消息,然后可以在别处处理这些消息。
我最接近的是
const somethingEpic = action$ =>
action$.ofType('START_SOCKET_OR_WHATEVER')
.switchMap(action =>
Observable.webSocket('ws://localhost:8081')
.map(response => ({ type: 'RECEIVED_MESSAGE', paylod: response }))
);
但是我不确定如何扩展它以另外发出连接建立和断开事件,以及另外接受要发送到服务器的消息。
一般来说,听起来您想要这样的东西:
(注意,这是未经测试的代码,但应该非常接近可运行)
const somethingEpic = action$ =>
action$.ofType('START_SOCKET_OR_WHATEVER')
.switchMap(action => {
// Subjects are a combination of an Observer *and* an Observable
// so webSocket can call openObserver$.next(event) and
// anyone who is subscribing to openObserver$ will receive it
// because Subjects are "hot"
const openObserver$ = new Subject();
const openObserver$ = new Subject();
// Listen for our open/close events and transform them
// to redux actions. We could also include values from
// the events like event.reason, etc if we wanted
const open$ = openObserver$.map((event) => ({
type: 'SOCKET_CONNECTED'
}));
const close$ = openObserver$.map((event) => ({
type: 'SOCKET_DISCONNECTED'
}));
// webSocket has an overload signature that accepts this object
const options = {
url: 'ws://localhost:8081',
openObserver: openObserver$,
closeObserver: openObserver$
};
const msg$ = Observable.webSocket(options)
.map(response => ({ type: 'RECEIVED_MESSAGE', payload: response }))
.catch(e => Observable.of({
type: 'SOCKET_ERROR',
payload: e.message
}))
// We're merging them all together because we want to listen for
// and emit actions from all three. For good measure I also included
// a generic .takeUntil() to demonstrate the most obvious way to stop
// the websocket (as well as the open/close, which we shouldn't forget!)
// Also notice how I'm listening for both the STOP_SOCKET_OR_WHATEVER
// or also a SOCKET_ERROR because we want to stop subscribing
// to open$/close$ if there is an error.
return Observable.merge(open$, close$, msg$)
.takeUntil(action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR'));
});
如果这个史诗需要一次支持多个套接字,您将需要想出某种方法来唯一标识特定连接,并修改代码以基于此过滤信号。例如
.takeUntil(
action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR')
.filter(action => action.someHowHaveId === someHowHaveId)
);
对于任何想完全按照我的方式做的人,我的最终代码如下。我最终意识到我真的需要一个用于连接、发出消息的史诗,以及另一个用于发送消息的史诗。
const notificationTypes = {
WEBSOCKET_TRY_CONNECT: "WEBSOCKET_TRY_CONNECT",
WEBSOCKET_TRY_DISCONNECT: "WEBSOCKET_TRY_DISCONNECT",
WEBSOCKET_CONNECTED: "WEBSOCKET_CONNECTED",
WEBSOCKET_DISCONNECTED: "WEBSOCKET_DISCONNECTED",
WEBSOCKET_ERROR: "WEBSOCKET_ERROR",
WEBSOCKET_MESSAGE_SEND: "WEBSOCKET_MESSAGE_SEND",
WEBSOCKET_MESSAGE_SENT: "WEBSOCKET_MESSAGE_SENT",
WEBSOCKET_MESSAGE_RECIEVED: "WEBSOCKET_MESSAGE_RECIEVED"
};
const notificationActions = {
tryConnect: () => ({ type: notificationTypes.WEBSOCKET_TRY_CONNECT }),
tryDisconnect: () => ({ type: notificationTypes.WEBSOCKET_TRY_DISCONNECT }),
sendNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_SEND, message }),
sentNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_SENT, message }),
receivedNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_RECIEVED, message }),
connected: () => ({ type: notificationTypes.WEBSOCKET_CONNECTED }),
disconnected: () => ({ type: notificationTypes.WEBSOCKET_DISCONNECTED }),
error: error => ({ type: notificationTypes.WEBSOCKET_ERROR, error })
};
let webSocket$ = null;
const notificationSendEpic = (action$, state$) =>
action$.pipe(
ofType(notificationTypes.WEBSOCKET_MESSAGE_SEND),
mergeMap(action => {
if (!webSocket$) {
return of(notificationActions.error(`Attempted to send message while no connection was open.`));
}
webSocket$.next(action.message);
return of(notificationActions.sentNotification(action.message));
})
);
const notificationConnectionEpic = (action$, state$) =>
action$.pipe(
ofType(notificationTypes.WEBSOCKET_TRY_CONNECT),
switchMap(action => {
if (webSocket$) {
return of(notificationActions.error(`Attempted to open connection when one was already open.`));
}
const webSocketOpen$ = new Subject();
const webSocketClose$ = new Subject();
const open$ = webSocketOpen$.pipe(take(1),map(() => of(notificationActions.connected())));
const close$ = webSocketClose$.pipe(take(1),map(() => {
webSocket$ = null;
return of(notificationActions.disconnected());
}));
webSocket$ = webSocket({
url: wsLocation,
openObserver: webSocketOpen$,
closeObserver: webSocketClose$
});
const message$ = webSocket$.pipe(
takeUntil(action$.ofType(notificationTypes.WEBSOCKET_DISCONNECTED, notificationTypes.WEBSOCKET_TRY_DISCONNECT)),
map(evt => of(notificationActions.receivedNotification(evt)))
);
return merge(message$, open$, close$);
}),
mergeMap(v => v)
);