如何使用 Spring 5 Reactive WebSocket 检测断开连接的客户端
How can I detect a disconnected client with Spring 5 Reactive WebSocket
我设法使用 Spring 5 Reactive WebSocket 支持 (Chapter 23.2.4) 创建了一个 WebSocketHandler
。接收和发送一切正常。但是,我不知道如何检测客户端断开连接。当调试客户端断开连接时,它在 HttpServerWSOperations
class(netty.http.server
包)的服务器端某处停止,它确实检测到 CloseWebSocketFrame
.
对如何处理客户端断开连接有什么建议吗?
您可以通过 afterConnectionClosed
方法检测到这一点。由于 handleTransportError
方法,您甚至可以处理传输错误。这是一个代码片段:
@Component
public class YourHandler extends TextWebSocketHandler {
private final static Logger logger = LoggerFactory.getLogger(YourHandler .class);
private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
logger.debug("Connected : " + session);
sessions.add(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
try {
//check the close status if you want...
if (!status.equals(CloseStatus.NORMAL)) {
session.close();
}
} catch (IOException e) {
logger.error("Cannot close session on afterConnectionClosed ", e);
}
sessions.remove(session);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
logger.debug("error has occured with the following session {}", session);
try {
session.close();
} catch (IOException e) {
logger.error("Cannot close session on handleTransportError ", e);
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
logger.debug("Receive : " + message.getPayload());
}
}
我在响应式 org.springframework.web.reactive.socket.WebSocketHandler 中实现了一个关闭事件处理程序,如下所示:
public Mono<Void> handle(final WebSocketSession session) {
final String sessionId = session.getId();
if(sessions.add(sessionId)) { // add session id to set to keep a count of active sessions
LOG.info("Starting WebSocket Session [{}]", sessionId);
// Send the session id back to the client
WebSocketMessage msg = session.textMessage(String.format("{\"session\":\"%s\"}", sessionId));
// Register the outbound flux as the source of outbound messages
final Flux<WebSocketMessage> outFlux = Flux.concat(Flux.just(msg), newMetricFlux.map(metric -> {
LOG.info("Sending message to client [{}]: {}", sessionId, metric);
return session.textMessage(metric);
}));
// Subscribe to the inbound message flux
session.receive().doFinally(sig -> {
LOG.info("Terminating WebSocket Session (client side) sig: [{}], [{}]", sig.name(), sessionId);
session.close();
sessions.remove(sessionId); // remove the stored session id
}).subscribe(inMsg -> {
LOG.info("Received inbound message from client [{}]: {}", sessionId, inMsg.getPayloadAsText());
});
return session.send(outFlux);
}
return Mono.empty();
}
newMetricFlux
字段是出站 websocket 消息的来源。挂钩关闭事件的技巧是入站消息通量上的 doFinally。当 websocket 客户端关闭时,入站通量终止。
但是,出于某种原因,在关闭 netty 通道和执行 doFinally
回调之间有 1 分钟的延迟。还不确定为什么。
这是浏览器客户端连接并立即关闭的日志输出。注意第 3 行和第 4 行之间的 60 秒延迟。
2017-08-03 11:15:41.177 DEBUG 28505 --- [ctor-http-nio-2] r.i.n.http.server.HttpServerOperations : New http connection, requesting read
2017-08-03 11:15:41.294 INFO 28505 --- [ctor-http-nio-2] c.h.w.ws.NewMetricsWebSocketHandler : Starting WebSocket Session [87fbe66]
2017-08-03 11:15:48.294 DEBUG 28505 --- [ctor-http-nio-2] r.i.n.http.server.HttpServerOperations : CloseWebSocketFrame detected. Closing Websocket
2017-08-03 11:16:48.293 INFO 28505 --- [ctor-http-nio-2] c.h.w.ws.NewMetricsWebSocketHandler : Terminating WebSocket Session (client side) sig: [ON_COMPLETE], [87fbe66]
更新:2017 年 10 月 13 日:
从 Spring 5 GA 开始,上述延迟不存在,我观察到我的回调在客户端关闭后立即被调用。不确定这是在哪个版本中修复的,但正如我所说,它已在 5.0 GA 中修复。
我设法使用 Spring 5 Reactive WebSocket 支持 (Chapter 23.2.4) 创建了一个 WebSocketHandler
。接收和发送一切正常。但是,我不知道如何检测客户端断开连接。当调试客户端断开连接时,它在 HttpServerWSOperations
class(netty.http.server
包)的服务器端某处停止,它确实检测到 CloseWebSocketFrame
.
对如何处理客户端断开连接有什么建议吗?
您可以通过 afterConnectionClosed
方法检测到这一点。由于 handleTransportError
方法,您甚至可以处理传输错误。这是一个代码片段:
@Component
public class YourHandler extends TextWebSocketHandler {
private final static Logger logger = LoggerFactory.getLogger(YourHandler .class);
private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
logger.debug("Connected : " + session);
sessions.add(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
try {
//check the close status if you want...
if (!status.equals(CloseStatus.NORMAL)) {
session.close();
}
} catch (IOException e) {
logger.error("Cannot close session on afterConnectionClosed ", e);
}
sessions.remove(session);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
logger.debug("error has occured with the following session {}", session);
try {
session.close();
} catch (IOException e) {
logger.error("Cannot close session on handleTransportError ", e);
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
logger.debug("Receive : " + message.getPayload());
}
}
我在响应式 org.springframework.web.reactive.socket.WebSocketHandler 中实现了一个关闭事件处理程序,如下所示:
public Mono<Void> handle(final WebSocketSession session) {
final String sessionId = session.getId();
if(sessions.add(sessionId)) { // add session id to set to keep a count of active sessions
LOG.info("Starting WebSocket Session [{}]", sessionId);
// Send the session id back to the client
WebSocketMessage msg = session.textMessage(String.format("{\"session\":\"%s\"}", sessionId));
// Register the outbound flux as the source of outbound messages
final Flux<WebSocketMessage> outFlux = Flux.concat(Flux.just(msg), newMetricFlux.map(metric -> {
LOG.info("Sending message to client [{}]: {}", sessionId, metric);
return session.textMessage(metric);
}));
// Subscribe to the inbound message flux
session.receive().doFinally(sig -> {
LOG.info("Terminating WebSocket Session (client side) sig: [{}], [{}]", sig.name(), sessionId);
session.close();
sessions.remove(sessionId); // remove the stored session id
}).subscribe(inMsg -> {
LOG.info("Received inbound message from client [{}]: {}", sessionId, inMsg.getPayloadAsText());
});
return session.send(outFlux);
}
return Mono.empty();
}
newMetricFlux
字段是出站 websocket 消息的来源。挂钩关闭事件的技巧是入站消息通量上的 doFinally。当 websocket 客户端关闭时,入站通量终止。
但是,出于某种原因,在关闭 netty 通道和执行 doFinally
回调之间有 1 分钟的延迟。还不确定为什么。
这是浏览器客户端连接并立即关闭的日志输出。注意第 3 行和第 4 行之间的 60 秒延迟。
2017-08-03 11:15:41.177 DEBUG 28505 --- [ctor-http-nio-2] r.i.n.http.server.HttpServerOperations : New http connection, requesting read
2017-08-03 11:15:41.294 INFO 28505 --- [ctor-http-nio-2] c.h.w.ws.NewMetricsWebSocketHandler : Starting WebSocket Session [87fbe66]
2017-08-03 11:15:48.294 DEBUG 28505 --- [ctor-http-nio-2] r.i.n.http.server.HttpServerOperations : CloseWebSocketFrame detected. Closing Websocket
2017-08-03 11:16:48.293 INFO 28505 --- [ctor-http-nio-2] c.h.w.ws.NewMetricsWebSocketHandler : Terminating WebSocket Session (client side) sig: [ON_COMPLETE], [87fbe66]
更新:2017 年 10 月 13 日:
从 Spring 5 GA 开始,上述延迟不存在,我观察到我的回调在客户端关闭后立即被调用。不确定这是在哪个版本中修复的,但正如我所说,它已在 5.0 GA 中修复。