Akka-http 方法来处理 websocket 命令

Akka-http approach to handle websocket commands

假设我有一个控制器来处理我从 websocket 收到的命令。

class WebSocketController(implicit M: Materializer)
  extends Controller
    with CirceSupport {

  override def route: Route = path("ws") {
    handleWebSocketMessages(wsFlow)
  }

  def wsFlow: Flow[Message, Message, Any] =
    Flow[Message].mapConcat {
      case tm: TextMessage =>
        decode[Command](tm.getStrictText) match {
          // todo pass this command to some actor or service 
          // and get response and reply back.  
          case Right(AuthorizeUser(login, password)) =>
            TextMessage(s"Authorized: $login, $password") :: Nil
          case _ =>
            Nil
        }

      case bm: BinaryMessage =>
        bm.dataStream.runWith(Sink.ignore)
        Nil
    }

}

所以,我得到一个命令,反序列化它,下一步我想做的是将它传递给一些服务或演员,它将 return 我 Future[SomeReply].

问题是: 使用 akka 流处理此类流的基本方法是什么?

Flow 中处理 Future 时,mapAsync 通常就是您要查找的内容。添加到您的具体示例:

def asyncOp(msg: TextMessage): Future[SomeReply] = ???
def tailorResponse(r: SomeReply): TextMessage = ???

def wsFlow: Flow[Message, Message, Any] =
  Flow[Message]
   .mapConcat {...}
   .mapAsync(parallelism = 4)(asyncOp)
   .via(tailorResponse)

mapAsyncUnordered 也可以使用,以防 Future 结果的顺序不相关。 并行度表示在阶段背压之前可以同时运行多少Future

另见

  • 阶段docs
  • 如何与ask结合使用 - here