Akka-Http:使用 websockets - 扩展流程的模式
Akka-Http : Working with websockets - Patterns for extending the Flow
akka http websocket 工作得很好。它需要 Flow[Message, Message, Future[Done]
我们可以创建相同形状的 Source val src = Source.maybe
和 Sink val snk = Sink.foreach(...)
并调用 Flow.fromSinkAndSourceMat(snk, src)
。
src用于向websocket(流源)发送数据,snk用于接收数据。
在某些情况下,我们希望扩展我们传递的流程,即 - 我们希望接收消息,将其发送到另一个流程以进行 JSON 解析或数据验证,然后最终发送到接收器。
如何构造我在 Http().singleWebSocketRequest(WebSocketRequest("ws://someip:port"), flow)
中传递的 Flow 以包含其他各种流阶段,然后是最终汇。
我可以将源设置为 Source.maybe
,因为我只关心传入。
您可以将 Sink
组合成一系列 Flow
和最后的 Sink
。
val flow1: Flow[Message, Message, NotUsed] = ???
val flow2: Flow[Message, String, NotUsed] = ???
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val megaSink = flow1.via(flow2).to(sink)
你的组合 Sink
然后可以用来组合你的 WS 流,类似于你已经拥有的:
Flow.fromSinkAndSource(megaSink, Source.maybe)
akka http websocket 工作得很好。它需要 Flow[Message, Message, Future[Done]
我们可以创建相同形状的 Source val src = Source.maybe
和 Sink val snk = Sink.foreach(...)
并调用 Flow.fromSinkAndSourceMat(snk, src)
。
src用于向websocket(流源)发送数据,snk用于接收数据。
在某些情况下,我们希望扩展我们传递的流程,即 - 我们希望接收消息,将其发送到另一个流程以进行 JSON 解析或数据验证,然后最终发送到接收器。
如何构造我在 Http().singleWebSocketRequest(WebSocketRequest("ws://someip:port"), flow)
中传递的 Flow 以包含其他各种流阶段,然后是最终汇。
我可以将源设置为 Source.maybe
,因为我只关心传入。
您可以将 Sink
组合成一系列 Flow
和最后的 Sink
。
val flow1: Flow[Message, Message, NotUsed] = ???
val flow2: Flow[Message, String, NotUsed] = ???
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val megaSink = flow1.via(flow2).to(sink)
你的组合 Sink
然后可以用来组合你的 WS 流,类似于你已经拥有的:
Flow.fromSinkAndSource(megaSink, Source.maybe)