Akka-Http:使用 websockets - 扩展流程的模式

Akka-Http : Working with websockets - Patterns for extending the Flow

akka http websocket 工作得很好。它需要 Flow[Message, Message, Future[Done]

我们可以创建相同形状的 Source val src = Source.maybe 和 Sink val snk = Sink.foreach(...) 并调用 Flow.fromSinkAndSourceMat(snk, src)。 src用于向websocket(流源)发送数据,snk用于接收数据。

在某些情况下,我们希望扩展我们传递的流程,即 - 我们希望接收消息,将其发送到另一个流程以进行 JSON 解析或数据验证,然后最终发送到接收器。

如何构造我在 Http().singleWebSocketRequest(WebSocketRequest("ws://someip:port"), flow) 中传递的 Flow 以包含其他各种流阶段,然后是最终汇。 我可以将源设置为 Source.maybe,因为我只关心传入。

您可以将 Sink 组合成一系列 Flow 和最后的 Sink

val flow1: Flow[Message, Message, NotUsed] = ???
val flow2: Flow[Message, String, NotUsed] = ???
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println) 

val megaSink = flow1.via(flow2).to(sink)

你的组合 Sink 然后可以用来组合你的 WS 流,类似于你已经拥有的:

Flow.fromSinkAndSource(megaSink, Source.maybe)