rxjs 和 websocket - 我需要心跳吗?
rxjs and websocket - Do I need a heartbeat?
我对 ReactiveX 和处理套接字事件还很陌生。我不断偶然发现的一件事是,在处理 websocket 时需要保持活动的心跳功能。
我在网上搜索时发现保活的原因可能包括以下原因:
- 我们需要能够检测到服务器的连接是否中断
- 我们需要保持连接打开,我们可以通过心跳来做到这一点。 (不是一直开着直到我手动关闭连接吗?)
目前,我正在用 rxjs 实现它,我即将实现心跳,但后来我开始怀疑这是否真的有必要?
在查看 rxjs 库时,可以使用 rxjs 套接字主题提供的观察者非常轻松地处理关闭事件,即:
interface WebSocketSubjectConfig<T> {
url: string
protocol?: string | Array<string>
resultSelector?: (e: MessageEvent) => T
serializer?: (value: T) => WebSocketMessage
deserializer?: (e: MessageEvent) => T
openObserver?: NextObserver<Event>
closeObserver?: NextObserver<CloseEvent>
closingObserver?: NextObserver<void>
WebSocketCtor?: {...}
binaryType?: 'blob' | 'arraybuffer'
}
因此,如果我遇到连接问题,我可以轻松地为 closeObserver
创建一个 Observable,它可以侦听任何关闭事件并随后触发重新连接循环。
我什至用我的本地 API 测试了这个,我可以看到每次我在我的 API 中模拟“错误”时都会触发该事件。
因此我的问题是:
- 我是不是遗漏了什么,rxjs 是否已经为我处理了这个问题,并在 return 中为我提供了
closeObserver
?
- 如果我已经拥有的东西看起来不错,为什么我还需要实施乒乓模式? (有没有我没看清的角落?)
Isn't it always open until I manually close the connection?
是的,没错。除非您手动关闭它,或者过程中出现错误,否则连接将保持打开状态。不需要心跳机制。
We need to be able to detect if the connection to the server breaks
down
这实际上是一个很好的问题,因为它可能不会立即显而易见。那么让我们看看WebSocketSubject
:
的实际代码
/**
* An Observer then watches when close events occur on the underlying webSocket
*/
closeObserver?: NextObserver<CloseEvent>;
所以这个总是在关闭事件到来时被调用。然后是自己的例子:
* **closeObserver** allows us to set a custom error when an error raise up.
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* closeObserver: {
next(closeEvent) {
const customError = { code: 6666, reason: "Custom evil reason" }
console.log(`code: ${customError.code}, reason: ${customError.reason}`);
}
}
* });
*
* //output
* // code: 6666, reason: Custom evil reason
其中显示了他们如何使用 closeObserver
Obs 来隧道化 customError。但到目前为止,关键是 closeObserver
不会处理所有关闭 scenarios/paths.
但是如果我们查看源文件,我们会看到:
socket.onerror = (e: Event) => {
this._resetState();
observer.error(e);
};
socket.onclose = (e: CloseEvent) => {
this._resetState();
const { closeObserver } = this._config;
if (closeObserver) {
closeObserver.next(e);
}
if (e.wasClean) {
observer.complete();
} else {
observer.error(e);
}
};
其中 const observer = this._output;
和 this._output = new Subject<T>();
。因此,正如我们所见,如果关闭事件不是干净的,则实际的 'core' 主题会收到错误信号。
这意味着,在内部,RxJS WS 会处理并公开捕获任何类型的服务器断开连接所需的一切,因此自定义心跳机制只是多余的。
顺便说一句,根据我的个人经验,大多数 WS 客户端的行为相似,只有服务器实现可能真正受益于自定义心跳系统(现在甚至可能所有事情都由 libs/framework 处理,我不知道)。
奖金
使用 RxJS WS 的正确(并且更复杂)connect/reconnect 流程的可能实现:
this.wsSubject = webSocket(wsUrl);
this.wsSubscription = this.wsSubject.pipe(
retryWhen(errors =>
errors.pipe(
concatMap((error, i) =>
iif(
() => this.environment.webSockets.maxReconnectAttempts !== -1 &&
i >= this.environment.webSockets.maxReconnectAttempts,
throwError('WebSocket reconnecting retry limit exceeded!'),
of(error).pipe(
tap(() => {
this.disconnected = true;
this.log.warn('Trying to reconnect to WebSocket server...');
}),
delay(this.environment.webSockets.reconnectAttemptDelay)
)
)
)
)
),
tap(() => {
if (this.disconnected) {
this.disconnected = false;
this.log.info('Successfully re-connected to the WebSocket server.');
}
})
).subscribe(
(data) => this.handleNotification(data),
(err) => this.log.error(err),
() => this.log.warn('Connection to the WebSocket server was closed!')
);
}
我对 ReactiveX 和处理套接字事件还很陌生。我不断偶然发现的一件事是,在处理 websocket 时需要保持活动的心跳功能。
我在网上搜索时发现保活的原因可能包括以下原因:
- 我们需要能够检测到服务器的连接是否中断
- 我们需要保持连接打开,我们可以通过心跳来做到这一点。 (不是一直开着直到我手动关闭连接吗?)
目前,我正在用 rxjs 实现它,我即将实现心跳,但后来我开始怀疑这是否真的有必要?
在查看 rxjs 库时,可以使用 rxjs 套接字主题提供的观察者非常轻松地处理关闭事件,即:
interface WebSocketSubjectConfig<T> {
url: string
protocol?: string | Array<string>
resultSelector?: (e: MessageEvent) => T
serializer?: (value: T) => WebSocketMessage
deserializer?: (e: MessageEvent) => T
openObserver?: NextObserver<Event>
closeObserver?: NextObserver<CloseEvent>
closingObserver?: NextObserver<void>
WebSocketCtor?: {...}
binaryType?: 'blob' | 'arraybuffer'
}
因此,如果我遇到连接问题,我可以轻松地为 closeObserver
创建一个 Observable,它可以侦听任何关闭事件并随后触发重新连接循环。
我什至用我的本地 API 测试了这个,我可以看到每次我在我的 API 中模拟“错误”时都会触发该事件。
因此我的问题是:
- 我是不是遗漏了什么,rxjs 是否已经为我处理了这个问题,并在 return 中为我提供了
closeObserver
? - 如果我已经拥有的东西看起来不错,为什么我还需要实施乒乓模式? (有没有我没看清的角落?)
Isn't it always open until I manually close the connection?
是的,没错。除非您手动关闭它,或者过程中出现错误,否则连接将保持打开状态。不需要心跳机制。
We need to be able to detect if the connection to the server breaks down
这实际上是一个很好的问题,因为它可能不会立即显而易见。那么让我们看看WebSocketSubject
:
/**
* An Observer then watches when close events occur on the underlying webSocket
*/
closeObserver?: NextObserver<CloseEvent>;
所以这个总是在关闭事件到来时被调用。然后是自己的例子:
* **closeObserver** allows us to set a custom error when an error raise up.
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* closeObserver: {
next(closeEvent) {
const customError = { code: 6666, reason: "Custom evil reason" }
console.log(`code: ${customError.code}, reason: ${customError.reason}`);
}
}
* });
*
* //output
* // code: 6666, reason: Custom evil reason
其中显示了他们如何使用 closeObserver
Obs 来隧道化 customError。但到目前为止,关键是 closeObserver
不会处理所有关闭 scenarios/paths.
但是如果我们查看源文件,我们会看到:
socket.onerror = (e: Event) => {
this._resetState();
observer.error(e);
};
socket.onclose = (e: CloseEvent) => {
this._resetState();
const { closeObserver } = this._config;
if (closeObserver) {
closeObserver.next(e);
}
if (e.wasClean) {
observer.complete();
} else {
observer.error(e);
}
};
其中 const observer = this._output;
和 this._output = new Subject<T>();
。因此,正如我们所见,如果关闭事件不是干净的,则实际的 'core' 主题会收到错误信号。
这意味着,在内部,RxJS WS 会处理并公开捕获任何类型的服务器断开连接所需的一切,因此自定义心跳机制只是多余的。
顺便说一句,根据我的个人经验,大多数 WS 客户端的行为相似,只有服务器实现可能真正受益于自定义心跳系统(现在甚至可能所有事情都由 libs/framework 处理,我不知道)。
奖金
使用 RxJS WS 的正确(并且更复杂)connect/reconnect 流程的可能实现:
this.wsSubject = webSocket(wsUrl);
this.wsSubscription = this.wsSubject.pipe(
retryWhen(errors =>
errors.pipe(
concatMap((error, i) =>
iif(
() => this.environment.webSockets.maxReconnectAttempts !== -1 &&
i >= this.environment.webSockets.maxReconnectAttempts,
throwError('WebSocket reconnecting retry limit exceeded!'),
of(error).pipe(
tap(() => {
this.disconnected = true;
this.log.warn('Trying to reconnect to WebSocket server...');
}),
delay(this.environment.webSockets.reconnectAttemptDelay)
)
)
)
)
),
tap(() => {
if (this.disconnected) {
this.disconnected = false;
this.log.info('Successfully re-connected to the WebSocket server.');
}
})
).subscribe(
(data) => this.handleNotification(data),
(err) => this.log.error(err),
() => this.log.warn('Connection to the WebSocket server was closed!')
);
}