使用 Akka 玩 2.6 WebSocket 代理 - 如何处理流式文本与严格文本
Play 2.6 WebSocket proxy using Akka - How to handle Streamed Text vs Strict Text
在 Play 2.6(基于 )中编写 WebSocket 代理时,我在处理流式文本时遇到问题。
相关代码:
def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
Flow[String].map(s => TextMessage(s))
.via(websocketFlow)
.map(_.asTextMessage.getStrictText)
}
这用于代理到本地 websocket 服务器。但是在代理到远程服务器时,会导致以下错误:
java.lang.IllegalStateException: Cannot get strict text for streamed message.
我们可以通过 _.asTextMessage.getStreamedText
获取流消息,但我不知道如何将其转换为 String
。
正如 Akka HTTP(Play 中的底层引擎)的 documentation 所述,不能期望消息总是 Strict
:
When receiving data from the network connection the WebSocket implementation tries to create a Strict
message whenever possible, i.e. when the complete data was received in one chunk. However, the actual chunking of messages over a network connection and through the various streaming abstraction layers is not deterministic from the perspective of the application. Therefore, application code must be able to handle both streamed and strict messages and not expect certain messages to be strict. (Particularly, note that tests against localhost
will behave differently than tests against remote peers where data is received over a physical network connection.)
要同时处理 Strict
和 Streamed
消息,您可以执行如下操作(受此 启发):
def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
Flow[String]
.map(TextMessage(_))
.via(websocketFlow)
.collect {
case TextMessage.Strict(text) =>
Future.successful(text)
case TextMessage.Streamed(textStream) =>
textStream
.limit(100)
.completionTimeout(10 seconds)
.runFold("")(_ + _)
.flatMap(x => Future.successful(x))
}
.mapAsync(parallelism = 3)(identity)
}
在 Play 2.6(基于
相关代码:
def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
Flow[String].map(s => TextMessage(s))
.via(websocketFlow)
.map(_.asTextMessage.getStrictText)
}
这用于代理到本地 websocket 服务器。但是在代理到远程服务器时,会导致以下错误:
java.lang.IllegalStateException: Cannot get strict text for streamed message.
我们可以通过 _.asTextMessage.getStreamedText
获取流消息,但我不知道如何将其转换为 String
。
正如 Akka HTTP(Play 中的底层引擎)的 documentation 所述,不能期望消息总是 Strict
:
When receiving data from the network connection the WebSocket implementation tries to create a
Strict
message whenever possible, i.e. when the complete data was received in one chunk. However, the actual chunking of messages over a network connection and through the various streaming abstraction layers is not deterministic from the perspective of the application. Therefore, application code must be able to handle both streamed and strict messages and not expect certain messages to be strict. (Particularly, note that tests againstlocalhost
will behave differently than tests against remote peers where data is received over a physical network connection.)
要同时处理 Strict
和 Streamed
消息,您可以执行如下操作(受此
def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
Flow[String]
.map(TextMessage(_))
.via(websocketFlow)
.collect {
case TextMessage.Strict(text) =>
Future.successful(text)
case TextMessage.Streamed(textStream) =>
textStream
.limit(100)
.completionTimeout(10 seconds)
.runFold("")(_ + _)
.flatMap(x => Future.successful(x))
}
.mapAsync(parallelism = 3)(identity)
}