如何在 akka websocket 中使用 Framing

How to use Framing with akka websocket

我是 akka-stream 的新手,我正在尝试将 Framing.lengthField 与 websocket https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala

一起使用

这是我的资料:

path("websocket") {
    handleWebSocketMessages(websocketFlow)
} 

def websocketFlow(): Flow[Message, Message, Any] =
Flow.fromGraph(
  GraphDSL.create(Source.actorRef[TestMessage](bufferSize = 10, OverflowStrategy.fail)) {
    implicit builder =>
      source =>

        val binaryMessageDecoder = Framing.lengthField(4, 0, 1024 * 32, ByteOrder.BIG_ENDIAN)

        val byteStringToMessage = Flow[ByteString].map { bytes =>
          TestMessage.parseFrom(bytes.toArray[Byte])
        }

        val fromWebsocket = builder.add(
          Flow[Message].map {
            case bm: BinaryMessage =>
              bm.dataStream
                .via(binaryMessageDecoder)
                .via(byteStringToMessage)
          }
        )

        val backToWebsocket = builder.add(
          Flow[TestMessage].map {
            case TestMessage(text) =>
              TextMessage(text)
          }
        )

        val actorSink = Sink.actorRef[TestMessage](testActor, PoisonPill)

        import GraphDSL.Implicits._

        fromWebsocket ~> actorSink // This line does not compile

        source ~> backToWebsocket

        FlowShape(fromWebsocket.in, backToWebsocket.out)
  }
)

我不明白这里的问题。我怎样才能使这项工作?

您的 fromWebsocket 流 returns 流,而不是消息,您的 actorSink 接收消息。

如果你想让它编译你必须确保它有returns信息,例如你可以把map改成flatMapConcat。这将确保从流返回的消息连接到新的消息流中。