graphql-spring-boot-starter 应用程序只有 websockets
graphql-spring-boot-starter Application with only websockets
我正在使用 spring-boot-starter-webflux 2.5.6 和 com.graphql-[= 构建一个 graphql 应用程序37=]-kickstart:graphql-spring-boot-starter:12.0.0。
由于 com.graphql-java-kickstart 很容易上手,因此应用程序 运行ning 很好。
使用 http-Requests,我可以调用 Queries 和 运行 Mutations,我什至可以通过 websockets 上的订阅创建和获取更新。
但对于我的应用程序,查询和变更也必须通过 websocket 运行。
似乎在 com.graphql-java-kickstart:graphql-spring-boot-starter 中您只能将订阅端点配置为 websocket。
通过 'extends Endpoint' 和“@ServerEndpoint”添加一个额外的 websocket 什么也没做。
我还尝试添加自己的 HandlerMapping:
@PostConstruct
public void init()
{
Map<String, Object> map = new HashMap<String, Object>(
((SimpleUrlHandlerMapping) webSocketHandlerMapping).getUrlMap());
map.put("/mysocket", myWebSocketHandler);
//map.put("/graphql", myWebSocketHandler);
((SimpleUrlHandlerMapping) webSocketHandlerMapping).setUrlMap(map);
((SimpleUrlHandlerMapping) webSocketHandlerMapping).initApplicationContext();
}
这似乎适用于 /mysocket 主题,但我如何为 /graphql 启用它,似乎已经有一个处理程序正在侦听:
WARN 12168 --- [ctor-http-nio-2] notprivacysafe.graphql.GraphQL : Query failed to parse : ''
以及如何将 websocket 与我的 GraphQLMutationResolvers 连接?
我解决此问题的切入点是创建一个 RestController
并将 ServerWebExchange
连接到 WebSocketService
中的 WebSocketHandler
,如下所示:
@RestController
@RequestMapping("/")
public class WebSocketController
{
private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);
private final GraphQLObjectMapper objectMapper;
private final GraphQLInvoker graphQLInvoker;
private final GraphQLSpringInvocationInputFactory invocationInputFactory;
private final WebSocketService service;
@Autowired
public WebSocketController(GraphQLObjectMapper objectMapper, GraphQLInvoker graphQLInvoker,
GraphQLSpringInvocationInputFactory invocationInputFactory, WebSocketService service)
{
this.objectMapper = objectMapper;
this.graphQLInvoker = graphQLInvoker;
this.invocationInputFactory = invocationInputFactory;
this.service = service;
}
@GetMapping("${graphql.websocket.path:graphql-ws}")
public Mono<Void> getMono(ServerWebExchange exchange)
{
logger.debug("New connection via GET");
return service.handleRequest(exchange,
new GraphQLWebsocketMessageConsumer(exchange, objectMapper, graphQLInvoker, invocationInputFactory));
}
@PostMapping("${graphql.websocket.path:graphql-ws}")
public Mono<Void> postMono(ServerWebExchange exchange)
{
...
}
}
在此原型状态下,WebSocketHandler
还实现了 Consumer
,调用它来处理每个 WebSocketMessage
:
public class GraphQLWebsocketMessageConsumer implements Consumer<String>, WebSocketHandler
{
private static final Logger logger = LoggerFactory.getLogger(GraphQLWebsocketMessageConsumer.class);
private final ServerWebExchange swe;
private final GraphQLObjectMapper objectMapper;
private final GraphQLInvoker graphQLInvoker;
private final GraphQLSpringInvocationInputFactory invocationInputFactory;
private final Sinks.Many<String> publisher;
public GraphQLWebsocketMessageConsumer(ServerWebExchange swe, GraphQLObjectMapper objectMapper,
GraphQLInvoker graphQLInvoker, GraphQLSpringInvocationInputFactory invocationInputFactory)
{
...
publisher = Sinks.many().multicast().directBestEffort();
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession)
{
Mono<Void> input = webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(this).then();
Mono<Void> sender = webSocketSession.send(publisher.asFlux().map(webSocketSession::textMessage));
return Mono.zip(input, sender).then();
}
@Override
public void accept(String body)
{
try
{
String query = extractQuery(body);
if(query == null)
{
return;
}
GraphQLRequest request = objectMapper.readGraphQLRequest(query);
GraphQLSingleInvocationInput invocationInput = invocationInputFactory.create(request, swe);
Mono<ExecutionResult> executionResult = Mono.fromCompletionStage(graphQLInvoker.executeAsync(invocationInput));
Mono<String> jsonResult = executionResult.map(objectMapper::serializeResultAsJson);
jsonResult.subscribe(publisher::tryEmitNext);
} catch (Exception e)
{
...
}
}
@SuppressWarnings("unchecked")
private String extractQuery(final String query) throws Exception
{
Map<String, Object> map = (Map<String, Object>) objectMapper.getJacksonMapper().readValue(query, Map.class);
...
return queryPart;
}
@Override
public List<String> getSubProtocols()
{
logger.debug("getSubProtocols called");
return Collections.singletonList("graphql-ws");
}
}
此解决方案尚未涉及身份验证或会话处理等安全方面。
我正在使用 spring-boot-starter-webflux 2.5.6 和 com.graphql-[= 构建一个 graphql 应用程序37=]-kickstart:graphql-spring-boot-starter:12.0.0。 由于 com.graphql-java-kickstart 很容易上手,因此应用程序 运行ning 很好。 使用 http-Requests,我可以调用 Queries 和 运行 Mutations,我什至可以通过 websockets 上的订阅创建和获取更新。
但对于我的应用程序,查询和变更也必须通过 websocket 运行。
似乎在 com.graphql-java-kickstart:graphql-spring-boot-starter 中您只能将订阅端点配置为 websocket。 通过 'extends Endpoint' 和“@ServerEndpoint”添加一个额外的 websocket 什么也没做。
我还尝试添加自己的 HandlerMapping:
@PostConstruct
public void init()
{
Map<String, Object> map = new HashMap<String, Object>(
((SimpleUrlHandlerMapping) webSocketHandlerMapping).getUrlMap());
map.put("/mysocket", myWebSocketHandler);
//map.put("/graphql", myWebSocketHandler);
((SimpleUrlHandlerMapping) webSocketHandlerMapping).setUrlMap(map);
((SimpleUrlHandlerMapping) webSocketHandlerMapping).initApplicationContext();
}
这似乎适用于 /mysocket 主题,但我如何为 /graphql 启用它,似乎已经有一个处理程序正在侦听:
WARN 12168 --- [ctor-http-nio-2] notprivacysafe.graphql.GraphQL : Query failed to parse : ''
以及如何将 websocket 与我的 GraphQLMutationResolvers 连接?
我解决此问题的切入点是创建一个 RestController
并将 ServerWebExchange
连接到 WebSocketService
中的 WebSocketHandler
,如下所示:
@RestController
@RequestMapping("/")
public class WebSocketController
{
private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);
private final GraphQLObjectMapper objectMapper;
private final GraphQLInvoker graphQLInvoker;
private final GraphQLSpringInvocationInputFactory invocationInputFactory;
private final WebSocketService service;
@Autowired
public WebSocketController(GraphQLObjectMapper objectMapper, GraphQLInvoker graphQLInvoker,
GraphQLSpringInvocationInputFactory invocationInputFactory, WebSocketService service)
{
this.objectMapper = objectMapper;
this.graphQLInvoker = graphQLInvoker;
this.invocationInputFactory = invocationInputFactory;
this.service = service;
}
@GetMapping("${graphql.websocket.path:graphql-ws}")
public Mono<Void> getMono(ServerWebExchange exchange)
{
logger.debug("New connection via GET");
return service.handleRequest(exchange,
new GraphQLWebsocketMessageConsumer(exchange, objectMapper, graphQLInvoker, invocationInputFactory));
}
@PostMapping("${graphql.websocket.path:graphql-ws}")
public Mono<Void> postMono(ServerWebExchange exchange)
{
...
}
}
在此原型状态下,WebSocketHandler
还实现了 Consumer
,调用它来处理每个 WebSocketMessage
:
public class GraphQLWebsocketMessageConsumer implements Consumer<String>, WebSocketHandler
{
private static final Logger logger = LoggerFactory.getLogger(GraphQLWebsocketMessageConsumer.class);
private final ServerWebExchange swe;
private final GraphQLObjectMapper objectMapper;
private final GraphQLInvoker graphQLInvoker;
private final GraphQLSpringInvocationInputFactory invocationInputFactory;
private final Sinks.Many<String> publisher;
public GraphQLWebsocketMessageConsumer(ServerWebExchange swe, GraphQLObjectMapper objectMapper,
GraphQLInvoker graphQLInvoker, GraphQLSpringInvocationInputFactory invocationInputFactory)
{
...
publisher = Sinks.many().multicast().directBestEffort();
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession)
{
Mono<Void> input = webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(this).then();
Mono<Void> sender = webSocketSession.send(publisher.asFlux().map(webSocketSession::textMessage));
return Mono.zip(input, sender).then();
}
@Override
public void accept(String body)
{
try
{
String query = extractQuery(body);
if(query == null)
{
return;
}
GraphQLRequest request = objectMapper.readGraphQLRequest(query);
GraphQLSingleInvocationInput invocationInput = invocationInputFactory.create(request, swe);
Mono<ExecutionResult> executionResult = Mono.fromCompletionStage(graphQLInvoker.executeAsync(invocationInput));
Mono<String> jsonResult = executionResult.map(objectMapper::serializeResultAsJson);
jsonResult.subscribe(publisher::tryEmitNext);
} catch (Exception e)
{
...
}
}
@SuppressWarnings("unchecked")
private String extractQuery(final String query) throws Exception
{
Map<String, Object> map = (Map<String, Object>) objectMapper.getJacksonMapper().readValue(query, Map.class);
...
return queryPart;
}
@Override
public List<String> getSubProtocols()
{
logger.debug("getSubProtocols called");
return Collections.singletonList("graphql-ws");
}
}
此解决方案尚未涉及身份验证或会话处理等安全方面。