如何异步接受 WebSocket?
How to accept a WebSocket asynchronously?
我有一个处理 WebSocket 请求的 Play 应用程序。路线文件包含这一行:
GET /testsocket controllers.HomeController.defaultRoomSocket
一个已经工作的同步版本如下所示:(改编自 2.7.x 文档)
public WebSocket defaultRoomSocket() {
return WebSocket.Text.accept(
request -> ActorFlow.actorRef(MyWebSocketActor::props, actorSystem, materializer));
}
public CompletionStage<WebSocket> defaultRoomSocket(){
//returning a CompletionStage here, using the "ask pattern"
//to get the needed Flow from an other Actor
}
从这里我运行进入以下问题:
Cannot use a method returning java.util.concurrent.CompletionStage[play.mvc.WebSocket] as a Handler for requests
此外,'WebSocket' 没有 TypeParameter,正如文档所建议的那样。接受 WebSocket 异步请求的合适方法是什么?
文档确实需要更新,我认为在 #5055.
的 websockets 重构中遗漏了一些位点
要获得异步处理,您应该使用 acceptOrResult
方法,该方法将 CompletionStage
作为 return 类型而不是流。这可以 return Result
或 Akka Flow
,使用函数式编程助手 (F.Either
)。事实上,下面是 accept
方法的实现方式:
public WebSocket accept(Function<Http.RequestHeader, Flow<In, Out, ?>> f) {
return acceptOrResult(
request -> CompletableFuture.completedFuture(F.Either.Right(f.apply(request))));
}
如您所见,它所做的只是使用 completedFuture
.
调用异步版本
要完全使其异步并达到我认为您想要实现的目标,您需要执行以下操作:
public WebSocket ws() {
return WebSocket.Json.acceptOrResult(request -> {
if (sameOriginCheck(request)) {
final CompletionStage<Flow<JsonNode, JsonNode, NotUsed>> future = wsFutureFlow(request);
final CompletionStage<Either<Result, Flow<JsonNode, JsonNode, ?>>> stage = future.thenApply(Either::Right);
return stage.exceptionally(this::logException);
} else {
return forbiddenResult();
}
});
}
@SuppressWarnings("unchecked")
private CompletionStage<Flow<JsonNode, JsonNode, NotUsed>> wsFutureFlow(Http.RequestHeader request) {
long id = request.asScala().id();
UserParentActor.Create create = new UserParentActor.Create(Long.toString(id));
return ask(userParentActor, create, t).thenApply((Object flow) -> {
final Flow<JsonNode, JsonNode, NotUsed> f = (Flow<JsonNode, JsonNode, NotUsed>) flow;
return f.named("websocket");
});
}
private CompletionStage<Either<Result, Flow<JsonNode, JsonNode, ?>>> forbiddenResult() {
final Result forbidden = Results.forbidden("forbidden");
final Either<Result, Flow<JsonNode, JsonNode, ?>> left = Either.Left(forbidden);
return CompletableFuture.completedFuture(left);
}
private Either<Result, Flow<JsonNode, JsonNode, ?>> logException(Throwable throwable) {
logger.error("Cannot create websocket", throwable);
Result result = Results.internalServerError("error");
return Either.Left(result);
}
(摘自 play-java-websocket-example,可能会有兴趣)
如您所见,在 returning websocket 连接或 HTTP 状态之前,它首先要经过几个阶段。
我有一个处理 WebSocket 请求的 Play 应用程序。路线文件包含这一行:
GET /testsocket controllers.HomeController.defaultRoomSocket
一个已经工作的同步版本如下所示:(改编自 2.7.x 文档)
public WebSocket defaultRoomSocket() {
return WebSocket.Text.accept(
request -> ActorFlow.actorRef(MyWebSocketActor::props, actorSystem, materializer));
}
public CompletionStage<WebSocket> defaultRoomSocket(){
//returning a CompletionStage here, using the "ask pattern"
//to get the needed Flow from an other Actor
}
从这里我运行进入以下问题:
Cannot use a method returning java.util.concurrent.CompletionStage[play.mvc.WebSocket] as a Handler for requests
此外,'WebSocket' 没有 TypeParameter,正如文档所建议的那样。接受 WebSocket 异步请求的合适方法是什么?
文档确实需要更新,我认为在 #5055.
的 websockets 重构中遗漏了一些位点要获得异步处理,您应该使用 acceptOrResult
方法,该方法将 CompletionStage
作为 return 类型而不是流。这可以 return Result
或 Akka Flow
,使用函数式编程助手 (F.Either
)。事实上,下面是 accept
方法的实现方式:
public WebSocket accept(Function<Http.RequestHeader, Flow<In, Out, ?>> f) {
return acceptOrResult(
request -> CompletableFuture.completedFuture(F.Either.Right(f.apply(request))));
}
如您所见,它所做的只是使用 completedFuture
.
要完全使其异步并达到我认为您想要实现的目标,您需要执行以下操作:
public WebSocket ws() {
return WebSocket.Json.acceptOrResult(request -> {
if (sameOriginCheck(request)) {
final CompletionStage<Flow<JsonNode, JsonNode, NotUsed>> future = wsFutureFlow(request);
final CompletionStage<Either<Result, Flow<JsonNode, JsonNode, ?>>> stage = future.thenApply(Either::Right);
return stage.exceptionally(this::logException);
} else {
return forbiddenResult();
}
});
}
@SuppressWarnings("unchecked")
private CompletionStage<Flow<JsonNode, JsonNode, NotUsed>> wsFutureFlow(Http.RequestHeader request) {
long id = request.asScala().id();
UserParentActor.Create create = new UserParentActor.Create(Long.toString(id));
return ask(userParentActor, create, t).thenApply((Object flow) -> {
final Flow<JsonNode, JsonNode, NotUsed> f = (Flow<JsonNode, JsonNode, NotUsed>) flow;
return f.named("websocket");
});
}
private CompletionStage<Either<Result, Flow<JsonNode, JsonNode, ?>>> forbiddenResult() {
final Result forbidden = Results.forbidden("forbidden");
final Either<Result, Flow<JsonNode, JsonNode, ?>> left = Either.Left(forbidden);
return CompletableFuture.completedFuture(left);
}
private Either<Result, Flow<JsonNode, JsonNode, ?>> logException(Throwable throwable) {
logger.error("Cannot create websocket", throwable);
Result result = Results.internalServerError("error");
return Either.Left(result);
}
(摘自 play-java-websocket-example,可能会有兴趣)
如您所见,在 returning websocket 连接或 HTTP 状态之前,它首先要经过几个阶段。