在 Akka 中使用 Strict 和 Streamed WebSocket 消息

Consuming both Strict and Streamed WebSocket Messages in Akka

我正在尝试使用 Akka HTTP 构建网络套接字服务。我需要处理完整到达的 Strict 消息,以及处理 m 个多帧到达的 Streamed 消息。我正在使用带有 handleWebSocketMessages() 的路由将 Web 套接字的处理传递给流。我的代码看起来像这样:

val route: Route =
  get {
    handleWebSocketMessages(createFlow())
  }

def createFlow(): Flow[Message, Message, Any] = Flow[Message]
  .collect {
    case TextMessage.Strict(msg) ⇒ msg
    case TextMessage.Streamed(stream) => ??? // <= What to do here??
  }
  .via(createActorFlow())
  .map {
    case msg: String ⇒ TextMessage.Strict(msg)
  }

def createActorFlow(): Flow[String, String, Any] = {
  // Set Up Actors
  // ... (this is working)
  Flow.fromSinkAndSource(in, out)
}

我不太确定两者如何处理 Strict 和 Streamed 消息。我意识到我可以做这样的事情:

  .collect {
    case TextMessage.Strict(msg) ⇒ Future.successful(msg)
    case TextMessage.Streamed(stream) => stream.runFold("")(_ + _)
  }

但现在我的流必须处理 Future[String] 而不仅仅是字符串,我当时不确定如何处理,尤其是因为显然我需要按顺序处理消息。

我确实看到了这个 akka 问题,它似乎有点相关,但不完全是我需要的(我不认为?)。

https://github.com/akka/akka/issues/20096

如有任何帮助,我们将不胜感激

弃牌听起来是个明智的选择。可以使用(例如)

处理流中的未来
flowOfFutures.mapAsync(parallelism = 3)(identity)

请注意,根据 docs

,mapAsync 会保留传入消息的顺序

另一方面,处理流式 WS 消息的其他明智的预防措施可能是使用 completionTimeout 并限制绑定时间和 space 消息折叠(例如)

stream.limit(x).completionTimeout(5 seconds).runFold(...)

基于以下(感谢 svezfaz)答案的最终答案是这样的:

val route: Route =
  get {
    handleWebSocketMessages(createFlow())
  }

def createFlow(): Flow[Message, Message, Any] = Flow[Message]
  .collect {
    case TextMessage.Strict(msg) ⇒ 
      Future.successful(MyCaseClass(msg))
    case TextMessage.Streamed(stream) => stream
      .limit(100)                   // Max frames we are willing to wait for
      .completionTimeout(5 seconds) // Max time until last frame
      .runFold("")(_ + _)           // Merges the frames
      .flatMap(msg => Future.successful(MyCaseClass(msg)))
  }
  .mapAsync(parallelism = 3)(identity)
  .via(createActorFlow())
  .map {
    case msg: String ⇒ TextMessage.Strict(msg)
  }

def createActorFlow(): Flow[MyCaseClass, String, Any] = {
  // Set Up Actors as source and sink (not shown)
  Flow.fromSinkAndSource(in, out)
}