如何使用 Spring 5 Reactive WebSocket 检测断开连接的客户端

How can I detect a disconnected client with Spring 5 Reactive WebSocket

我设法使用 Spring 5 Reactive WebSocket 支持 (Chapter 23.2.4) 创建了一个 WebSocketHandler。接收和发送一切正常。但是,我不知道如何检测客户端断开连接。当调试客户端断开连接时,它在 HttpServerWSOperations class(netty.http.server 包)的服务器端某处停止,它确实检测到 CloseWebSocketFrame.

对如何处理客户端断开连接有什么建议吗?

您可以通过 afterConnectionClosed 方法检测到这一点。由于 handleTransportError 方法,您甚至可以处理传输错误。这是一个代码片段:

@Component
public class YourHandler extends TextWebSocketHandler {

    private final static Logger logger = LoggerFactory.getLogger(YourHandler .class);
    private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        logger.debug("Connected : " + session);
        sessions.add(session);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        try {
            //check the close status if you want...
            if (!status.equals(CloseStatus.NORMAL)) {
                session.close();
            }
        } catch (IOException e) {
            logger.error("Cannot close session on afterConnectionClosed ", e);
        }
        sessions.remove(session);
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        logger.debug("error has occured with the following session {}", session);
        try {
            session.close();
        } catch (IOException e) {
            logger.error("Cannot close session on handleTransportError ", e);
        }
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        logger.debug("Receive : " + message.getPayload());      
    }
}

我在响应式 org.springframework.web.reactive.socket.WebSocketHandler 中实现了一个关闭事件处理程序,如下所示:

public Mono<Void> handle(final WebSocketSession session) {
    final String sessionId = session.getId();
    if(sessions.add(sessionId)) {  // add session id to set to keep a count of active sessions
        LOG.info("Starting WebSocket Session [{}]", sessionId);
        // Send the session id back to the client
        WebSocketMessage msg = session.textMessage(String.format("{\"session\":\"%s\"}", sessionId));
        // Register the outbound flux as the source of outbound messages
        final Flux<WebSocketMessage> outFlux = Flux.concat(Flux.just(msg), newMetricFlux.map(metric -> {
            LOG.info("Sending message to client [{}]: {}", sessionId, metric);
            return session.textMessage(metric);             
        }));
        // Subscribe to the inbound message flux
        session.receive().doFinally(sig -> {
            LOG.info("Terminating WebSocket Session (client side) sig: [{}], [{}]", sig.name(), sessionId);
            session.close();
            sessions.remove(sessionId);  // remove the stored session id
        }).subscribe(inMsg -> {
            LOG.info("Received inbound message from client [{}]: {}", sessionId, inMsg.getPayloadAsText());
        });
        return session.send(outFlux);
    }
    return Mono.empty();
}

newMetricFlux 字段是出站 websocket 消息的来源。挂钩关闭事件的技巧是入站消息通量上的 doFinally。当 websocket 客户端关闭时,入站通量终止。

但是,出于某种原因,在关闭 netty 通道和执行 doFinally 回调之间有 1 分钟的延迟。还不确定为什么。

这是浏览器客户端连接并立即关闭的日志输出。注意第 3 行和第 4 行之间的 60 秒延迟。

2017-08-03 11:15:41.177 DEBUG 28505 --- [ctor-http-nio-2] r.i.n.http.server.HttpServerOperations   : New http connection, requesting read
2017-08-03 11:15:41.294  INFO 28505 --- [ctor-http-nio-2] c.h.w.ws.NewMetricsWebSocketHandler      : Starting WebSocket Session [87fbe66]
2017-08-03 11:15:48.294 DEBUG 28505 --- [ctor-http-nio-2] r.i.n.http.server.HttpServerOperations   : CloseWebSocketFrame detected. Closing Websocket
2017-08-03 11:16:48.293  INFO 28505 --- [ctor-http-nio-2] c.h.w.ws.NewMetricsWebSocketHandler      : Terminating WebSocket Session (client side) sig: [ON_COMPLETE], [87fbe66]

更新:2017 年 10 月 13 日:

从 Spring 5 GA 开始,上述延迟不存在,我观察到我的回调在客户端关闭后立即被调用。不确定这是在哪个版本中修复的,但正如我所说,它已在 5.0 GA 中修复。