Spring 启动响应式 Websocket - 中断通量,直到收到来自客户端的所有信息
Spring Boot Reactive Websoket - Block out flux until received all information from client
我正在玩反应式 websocket(在 spring boot 2.1.0 上)但是我在尝试阻止等待客户端信息的通量时遇到问题。
我知道阻塞不是处理反应流的正确方法,但我需要在继续之前从客户端接收一些信息(即:授权密钥、ID),有一种可接受的方法来管理它以被动的方式?
例如:我希望客户端发送一个授权密钥和一个订阅 ID(只订阅一个特定的事件),只有当我有这两个信息时我才会发送通量,或者如果信息关闭会话无效
我尝试在 handle 方法中管理检查
webSocketSession.receive().subscribe(inMsg -> {
if(!inMsg.getPayloadAsText().equals("test-key")) {
log.info("AUTHORIZATION ERROR");
webSocketSession.close();
}
});
但是这种方式行不通而且不正确,因为它以 "async" 方式管理会话终止,而且无论如何,当收到带有错误密钥的消息时,会话仍保持活动状态
另一种方法是 "session" 使用内存存储来跟踪收到的信息并在业务逻辑级别处理此信息
我一直在寻找一种正确的方法来以被动方式管理它
我的起点是这个例子:http://www.baeldung.com/spring-5-reactive-websockets
提前致谢
其他信息:
我已经创建了一个基本的 spring 引导应用程序,并且我已经为 websocket 添加了一个配置 class:
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter-test", new MyWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1); // before annotated controllers
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
下面是主要的class,包含主要的websocket方法(用我的实际代码修改):
@Component
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
private WebSocketHandler webSocketHandler;
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter-test", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
webSocketSession.receive().subscribe(inMsg -> {
if(!inMsg.getPayloadAsText().equals("test-key")) {
// log.info("AUTHORIZATION ERROR");
webSocketSession.close();
}
});
List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}"));
Flux<String> intervalFlux = Flux
.interval(Duration.ofMillis(500))
.map(tick -> {
if (tick < data.size())
return "item " + tick + ": " + data.get(tick.intValue());
return "Done (tick == data.size())";
});
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage));
}
}
你不应该 subscribe
也不 block
在反应管道中 - 你可以发现你在这样的管道中,因为处理程序方法的 return 类型是 Mono<Void>
,表示传入消息处理完成的信号。
对于您的情况,您可能想要阅读第一条消息,检查它是否包含您期望的订阅信息,然后发送消息。
public class TestWebSocketHandler implements WebSocketHandler {
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(msg -> {
SubscriptionInfo info = extract(msg);
if (info == null) {
return session.close();
}
else {
Mono<WebSocketMessage> message = Mono.just(session.textMessage("message"));
return session.send(message);
}
})
.then();
}
SubscriptionInfo extract(String message) {
//
}
class SubscriptionInfo {
//
}
}
我正在玩反应式 websocket(在 spring boot 2.1.0 上)但是我在尝试阻止等待客户端信息的通量时遇到问题。
我知道阻塞不是处理反应流的正确方法,但我需要在继续之前从客户端接收一些信息(即:授权密钥、ID),有一种可接受的方法来管理它以被动的方式?
例如:我希望客户端发送一个授权密钥和一个订阅 ID(只订阅一个特定的事件),只有当我有这两个信息时我才会发送通量,或者如果信息关闭会话无效
我尝试在 handle 方法中管理检查
webSocketSession.receive().subscribe(inMsg -> {
if(!inMsg.getPayloadAsText().equals("test-key")) {
log.info("AUTHORIZATION ERROR");
webSocketSession.close();
}
});
但是这种方式行不通而且不正确,因为它以 "async" 方式管理会话终止,而且无论如何,当收到带有错误密钥的消息时,会话仍保持活动状态
另一种方法是 "session" 使用内存存储来跟踪收到的信息并在业务逻辑级别处理此信息
我一直在寻找一种正确的方法来以被动方式管理它
我的起点是这个例子:http://www.baeldung.com/spring-5-reactive-websockets
提前致谢
其他信息:
我已经创建了一个基本的 spring 引导应用程序,并且我已经为 websocket 添加了一个配置 class:
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter-test", new MyWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1); // before annotated controllers
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
下面是主要的class,包含主要的websocket方法(用我的实际代码修改):
@Component
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
private WebSocketHandler webSocketHandler;
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter-test", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
webSocketSession.receive().subscribe(inMsg -> {
if(!inMsg.getPayloadAsText().equals("test-key")) {
// log.info("AUTHORIZATION ERROR");
webSocketSession.close();
}
});
List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}"));
Flux<String> intervalFlux = Flux
.interval(Duration.ofMillis(500))
.map(tick -> {
if (tick < data.size())
return "item " + tick + ": " + data.get(tick.intValue());
return "Done (tick == data.size())";
});
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage));
}
}
你不应该 subscribe
也不 block
在反应管道中 - 你可以发现你在这样的管道中,因为处理程序方法的 return 类型是 Mono<Void>
,表示传入消息处理完成的信号。
对于您的情况,您可能想要阅读第一条消息,检查它是否包含您期望的订阅信息,然后发送消息。
public class TestWebSocketHandler implements WebSocketHandler {
public Mono<Void> handle(WebSocketSession session) {
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(msg -> {
SubscriptionInfo info = extract(msg);
if (info == null) {
return session.close();
}
else {
Mono<WebSocketMessage> message = Mono.just(session.textMessage("message"));
return session.send(message);
}
})
.then();
}
SubscriptionInfo extract(String message) {
//
}
class SubscriptionInfo {
//
}
}