在 redux-observable 中关闭 websocket 连接
Close websocket connection in redux-observable
我目前正在使用 redux-observable
来处理 websocket 连接,但不知道什么是处理 close
事件的最佳方式。
const webSocket$ = Observable.webSocket({
url: 'ws://localhost:8080'
});
export const fetchMessagesEpic = (action$) =>
action$.ofType(FETCH.MESSAGES)
.switchMap((action) =>
webSocket$
.catch((err) => { console.log(err);return true; }) <-- I can see the close event here
.map((msg) => addMessage(msg)));
我应该在 catch
中处理它(从那里发送一个新动作,然后在同一史诗中使用 takeUntil
吗?
如果 CloseEvent's wasClean
值为 false,来自 rxjs 的 Observable.webSocket
(又名 WebSocketSubject)仅将关闭事件发送到错误路径。
socket.onclose = (e: CloseEvent) => {
this._resetState();
const closeObserver = this.closeObserver;
if (closeObserver) {
closeObserver.next(e);
}
// v------------------------ Right here
if (e.wasClean) {
observer.complete();
} else {
observer.error(e);
}
};
所以这意味着在你的情况下连接没有正确关闭。 CloseEvent 上有时会有一个 reason
字段,它可能会或可能不会为您提供洞察力,或者 Chrome Dev Tools Network 和 Console 选项卡可能也有更多。
当一个套接字被远程服务器完全关闭时,做一些特殊事情的正确方法是传入一个closeObserver
。直接是 { next: func }
观察者,或者更可能是 Subject(既是 Observable 又是观察者)
// a stream of CloseEvents
const webSocketClose$ = new Subject();
const webSocket$ = Observable.webSocket({
url: 'ws://localhost:8080',
closeObserver: webSocketClose$
});
export const fetchMessagesEpic = (action$) =>
action$.ofType(FETCH.MESSAGES)
.switchMap((action) => {
const message$ = webSocket$
.map((msg) => addMessage(msg))
.catch((e) => Observable.of({
type: 'SOCKET_ERROR',
error: e
}));
const close$ = webSocketClose$
.take(1) // you probably only want one
.map((event) => ({
type: 'SOCKET_CLOSE_EVENT',
wasClean: event.wasClean,
code: event.code,
reason: event.reason
}));
return Observable.merge(message$, close$);
});
这段代码做了几个假设:
- 您要处理
event.wasClean === true
错误。此示例同时使用 closeObserver 和 catch,但您不必这样做。目前尚不清楚您是只关心不干净的关闭还是所有关闭。了解不干净的 CloseEvents 并不是 WebSocketSubject 可能抛出的唯一错误也很重要。来自 socket.onerror
的错误以及 JS 异常都会发送到那里。
- 我们对
webSocket$
的订阅将在套接字关闭时完成(),它确实如此。如果没有,我们想要 webSocket$.takeUntil(webSocketClose$).map(...)
另外,我想指出 rxjs 的 catch 运算符要求您 return 一个 Observable,但是您的示例 returned 一个布尔值。这会产生错误。如果您只想记录错误而不是处理错误,请使用 do
运算符。
// BAD
.catch((err) => { console.log(err);return true; })
// GOOD
.do({ error: (err) => console.log(err) })
我目前正在使用 redux-observable
来处理 websocket 连接,但不知道什么是处理 close
事件的最佳方式。
const webSocket$ = Observable.webSocket({
url: 'ws://localhost:8080'
});
export const fetchMessagesEpic = (action$) =>
action$.ofType(FETCH.MESSAGES)
.switchMap((action) =>
webSocket$
.catch((err) => { console.log(err);return true; }) <-- I can see the close event here
.map((msg) => addMessage(msg)));
我应该在 catch
中处理它(从那里发送一个新动作,然后在同一史诗中使用 takeUntil
吗?
如果 CloseEvent's wasClean
值为 false,来自 rxjs 的 Observable.webSocket
(又名 WebSocketSubject)仅将关闭事件发送到错误路径。
socket.onclose = (e: CloseEvent) => {
this._resetState();
const closeObserver = this.closeObserver;
if (closeObserver) {
closeObserver.next(e);
}
// v------------------------ Right here
if (e.wasClean) {
observer.complete();
} else {
observer.error(e);
}
};
所以这意味着在你的情况下连接没有正确关闭。 CloseEvent 上有时会有一个 reason
字段,它可能会或可能不会为您提供洞察力,或者 Chrome Dev Tools Network 和 Console 选项卡可能也有更多。
当一个套接字被远程服务器完全关闭时,做一些特殊事情的正确方法是传入一个closeObserver
。直接是 { next: func }
观察者,或者更可能是 Subject(既是 Observable 又是观察者)
// a stream of CloseEvents
const webSocketClose$ = new Subject();
const webSocket$ = Observable.webSocket({
url: 'ws://localhost:8080',
closeObserver: webSocketClose$
});
export const fetchMessagesEpic = (action$) =>
action$.ofType(FETCH.MESSAGES)
.switchMap((action) => {
const message$ = webSocket$
.map((msg) => addMessage(msg))
.catch((e) => Observable.of({
type: 'SOCKET_ERROR',
error: e
}));
const close$ = webSocketClose$
.take(1) // you probably only want one
.map((event) => ({
type: 'SOCKET_CLOSE_EVENT',
wasClean: event.wasClean,
code: event.code,
reason: event.reason
}));
return Observable.merge(message$, close$);
});
这段代码做了几个假设:
- 您要处理
event.wasClean === true
错误。此示例同时使用 closeObserver 和 catch,但您不必这样做。目前尚不清楚您是只关心不干净的关闭还是所有关闭。了解不干净的 CloseEvents 并不是 WebSocketSubject 可能抛出的唯一错误也很重要。来自socket.onerror
的错误以及 JS 异常都会发送到那里。 - 我们对
webSocket$
的订阅将在套接字关闭时完成(),它确实如此。如果没有,我们想要webSocket$.takeUntil(webSocketClose$).map(...)
另外,我想指出 rxjs 的 catch 运算符要求您 return 一个 Observable,但是您的示例 returned 一个布尔值。这会产生错误。如果您只想记录错误而不是处理错误,请使用 do
运算符。
// BAD
.catch((err) => { console.log(err);return true; })
// GOOD
.do({ error: (err) => console.log(err) })