Akka-http 方法来处理 websocket 命令
Akka-http approach to handle websocket commands
假设我有一个控制器来处理我从 websocket 收到的命令。
class WebSocketController(implicit M: Materializer)
extends Controller
with CirceSupport {
override def route: Route = path("ws") {
handleWebSocketMessages(wsFlow)
}
def wsFlow: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
decode[Command](tm.getStrictText) match {
// todo pass this command to some actor or service
// and get response and reply back.
case Right(AuthorizeUser(login, password)) =>
TextMessage(s"Authorized: $login, $password") :: Nil
case _ =>
Nil
}
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Nil
}
}
所以,我得到一个命令,反序列化它,下一步我想做的是将它传递给一些服务或演员,它将 return 我 Future[SomeReply]
.
问题是:
使用 akka 流处理此类流的基本方法是什么?
在 Flow
中处理 Future
时,mapAsync
通常就是您要查找的内容。添加到您的具体示例:
def asyncOp(msg: TextMessage): Future[SomeReply] = ???
def tailorResponse(r: SomeReply): TextMessage = ???
def wsFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapConcat {...}
.mapAsync(parallelism = 4)(asyncOp)
.via(tailorResponse)
mapAsyncUnordered
也可以使用,以防 Future
结果的顺序不相关。
并行度表示在阶段背压之前可以同时运行多少Future
。
另见
假设我有一个控制器来处理我从 websocket 收到的命令。
class WebSocketController(implicit M: Materializer)
extends Controller
with CirceSupport {
override def route: Route = path("ws") {
handleWebSocketMessages(wsFlow)
}
def wsFlow: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
decode[Command](tm.getStrictText) match {
// todo pass this command to some actor or service
// and get response and reply back.
case Right(AuthorizeUser(login, password)) =>
TextMessage(s"Authorized: $login, $password") :: Nil
case _ =>
Nil
}
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Nil
}
}
所以,我得到一个命令,反序列化它,下一步我想做的是将它传递给一些服务或演员,它将 return 我 Future[SomeReply]
.
问题是: 使用 akka 流处理此类流的基本方法是什么?
在 Flow
中处理 Future
时,mapAsync
通常就是您要查找的内容。添加到您的具体示例:
def asyncOp(msg: TextMessage): Future[SomeReply] = ???
def tailorResponse(r: SomeReply): TextMessage = ???
def wsFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapConcat {...}
.mapAsync(parallelism = 4)(asyncOp)
.via(tailorResponse)
mapAsyncUnordered
也可以使用,以防 Future
结果的顺序不相关。
并行度表示在阶段背压之前可以同时运行多少Future
。
另见