在 Akka 中使用 Strict 和 Streamed WebSocket 消息
Consuming both Strict and Streamed WebSocket Messages in Akka
我正在尝试使用 Akka HTTP 构建网络套接字服务。我需要处理完整到达的 Strict 消息,以及处理 m 个多帧到达的 Streamed 消息。我正在使用带有 handleWebSocketMessages() 的路由将 Web 套接字的处理传递给流。我的代码看起来像这样:
val route: Route =
get {
handleWebSocketMessages(createFlow())
}
def createFlow(): Flow[Message, Message, Any] = Flow[Message]
.collect {
case TextMessage.Strict(msg) ⇒ msg
case TextMessage.Streamed(stream) => ??? // <= What to do here??
}
.via(createActorFlow())
.map {
case msg: String ⇒ TextMessage.Strict(msg)
}
def createActorFlow(): Flow[String, String, Any] = {
// Set Up Actors
// ... (this is working)
Flow.fromSinkAndSource(in, out)
}
我不太确定两者如何处理 Strict 和 Streamed 消息。我意识到我可以做这样的事情:
.collect {
case TextMessage.Strict(msg) ⇒ Future.successful(msg)
case TextMessage.Streamed(stream) => stream.runFold("")(_ + _)
}
但现在我的流必须处理 Future[String] 而不仅仅是字符串,我当时不确定如何处理,尤其是因为显然我需要按顺序处理消息。
我确实看到了这个 akka 问题,它似乎有点相关,但不完全是我需要的(我不认为?)。
https://github.com/akka/akka/issues/20096
如有任何帮助,我们将不胜感激
弃牌听起来是个明智的选择。可以使用(例如)
处理流中的未来
flowOfFutures.mapAsync(parallelism = 3)(identity)
请注意,根据 docs。
,mapAsync 会保留传入消息的顺序
另一方面,处理流式 WS 消息的其他明智的预防措施可能是使用 completionTimeout 并限制绑定时间和 space 消息折叠(例如)
stream.limit(x).completionTimeout(5 seconds).runFold(...)
基于以下(感谢 svezfaz)答案的最终答案是这样的:
val route: Route =
get {
handleWebSocketMessages(createFlow())
}
def createFlow(): Flow[Message, Message, Any] = Flow[Message]
.collect {
case TextMessage.Strict(msg) ⇒
Future.successful(MyCaseClass(msg))
case TextMessage.Streamed(stream) => stream
.limit(100) // Max frames we are willing to wait for
.completionTimeout(5 seconds) // Max time until last frame
.runFold("")(_ + _) // Merges the frames
.flatMap(msg => Future.successful(MyCaseClass(msg)))
}
.mapAsync(parallelism = 3)(identity)
.via(createActorFlow())
.map {
case msg: String ⇒ TextMessage.Strict(msg)
}
def createActorFlow(): Flow[MyCaseClass, String, Any] = {
// Set Up Actors as source and sink (not shown)
Flow.fromSinkAndSource(in, out)
}
我正在尝试使用 Akka HTTP 构建网络套接字服务。我需要处理完整到达的 Strict 消息,以及处理 m 个多帧到达的 Streamed 消息。我正在使用带有 handleWebSocketMessages() 的路由将 Web 套接字的处理传递给流。我的代码看起来像这样:
val route: Route =
get {
handleWebSocketMessages(createFlow())
}
def createFlow(): Flow[Message, Message, Any] = Flow[Message]
.collect {
case TextMessage.Strict(msg) ⇒ msg
case TextMessage.Streamed(stream) => ??? // <= What to do here??
}
.via(createActorFlow())
.map {
case msg: String ⇒ TextMessage.Strict(msg)
}
def createActorFlow(): Flow[String, String, Any] = {
// Set Up Actors
// ... (this is working)
Flow.fromSinkAndSource(in, out)
}
我不太确定两者如何处理 Strict 和 Streamed 消息。我意识到我可以做这样的事情:
.collect {
case TextMessage.Strict(msg) ⇒ Future.successful(msg)
case TextMessage.Streamed(stream) => stream.runFold("")(_ + _)
}
但现在我的流必须处理 Future[String] 而不仅仅是字符串,我当时不确定如何处理,尤其是因为显然我需要按顺序处理消息。
我确实看到了这个 akka 问题,它似乎有点相关,但不完全是我需要的(我不认为?)。
https://github.com/akka/akka/issues/20096
如有任何帮助,我们将不胜感激
弃牌听起来是个明智的选择。可以使用(例如)
处理流中的未来flowOfFutures.mapAsync(parallelism = 3)(identity)
请注意,根据 docs。
,mapAsync 会保留传入消息的顺序另一方面,处理流式 WS 消息的其他明智的预防措施可能是使用 completionTimeout 并限制绑定时间和 space 消息折叠(例如)
stream.limit(x).completionTimeout(5 seconds).runFold(...)
基于以下(感谢 svezfaz)答案的最终答案是这样的:
val route: Route =
get {
handleWebSocketMessages(createFlow())
}
def createFlow(): Flow[Message, Message, Any] = Flow[Message]
.collect {
case TextMessage.Strict(msg) ⇒
Future.successful(MyCaseClass(msg))
case TextMessage.Streamed(stream) => stream
.limit(100) // Max frames we are willing to wait for
.completionTimeout(5 seconds) // Max time until last frame
.runFold("")(_ + _) // Merges the frames
.flatMap(msg => Future.successful(MyCaseClass(msg)))
}
.mapAsync(parallelism = 3)(identity)
.via(createActorFlow())
.map {
case msg: String ⇒ TextMessage.Strict(msg)
}
def createActorFlow(): Flow[MyCaseClass, String, Any] = {
// Set Up Actors as source and sink (not shown)
Flow.fromSinkAndSource(in, out)
}