如何在 akka websocket 中使用 Framing
How to use Framing with akka websocket
我是 akka-stream 的新手,我正在尝试将 Framing.lengthField 与 websocket https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala
一起使用
这是我的资料:
path("websocket") {
handleWebSocketMessages(websocketFlow)
}
def websocketFlow(): Flow[Message, Message, Any] =
Flow.fromGraph(
GraphDSL.create(Source.actorRef[TestMessage](bufferSize = 10, OverflowStrategy.fail)) {
implicit builder =>
source =>
val binaryMessageDecoder = Framing.lengthField(4, 0, 1024 * 32, ByteOrder.BIG_ENDIAN)
val byteStringToMessage = Flow[ByteString].map { bytes =>
TestMessage.parseFrom(bytes.toArray[Byte])
}
val fromWebsocket = builder.add(
Flow[Message].map {
case bm: BinaryMessage =>
bm.dataStream
.via(binaryMessageDecoder)
.via(byteStringToMessage)
}
)
val backToWebsocket = builder.add(
Flow[TestMessage].map {
case TestMessage(text) =>
TextMessage(text)
}
)
val actorSink = Sink.actorRef[TestMessage](testActor, PoisonPill)
import GraphDSL.Implicits._
fromWebsocket ~> actorSink // This line does not compile
source ~> backToWebsocket
FlowShape(fromWebsocket.in, backToWebsocket.out)
}
)
我不明白这里的问题。我怎样才能使这项工作?
您的 fromWebsocket
流 returns 流,而不是消息,您的 actorSink
接收消息。
如果你想让它编译你必须确保它有returns信息,例如你可以把map
改成flatMapConcat
。这将确保从流返回的消息连接到新的消息流中。
我是 akka-stream 的新手,我正在尝试将 Framing.lengthField 与 websocket https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala
一起使用这是我的资料:
path("websocket") {
handleWebSocketMessages(websocketFlow)
}
def websocketFlow(): Flow[Message, Message, Any] =
Flow.fromGraph(
GraphDSL.create(Source.actorRef[TestMessage](bufferSize = 10, OverflowStrategy.fail)) {
implicit builder =>
source =>
val binaryMessageDecoder = Framing.lengthField(4, 0, 1024 * 32, ByteOrder.BIG_ENDIAN)
val byteStringToMessage = Flow[ByteString].map { bytes =>
TestMessage.parseFrom(bytes.toArray[Byte])
}
val fromWebsocket = builder.add(
Flow[Message].map {
case bm: BinaryMessage =>
bm.dataStream
.via(binaryMessageDecoder)
.via(byteStringToMessage)
}
)
val backToWebsocket = builder.add(
Flow[TestMessage].map {
case TestMessage(text) =>
TextMessage(text)
}
)
val actorSink = Sink.actorRef[TestMessage](testActor, PoisonPill)
import GraphDSL.Implicits._
fromWebsocket ~> actorSink // This line does not compile
source ~> backToWebsocket
FlowShape(fromWebsocket.in, backToWebsocket.out)
}
)
我不明白这里的问题。我怎样才能使这项工作?
您的 fromWebsocket
流 returns 流,而不是消息,您的 actorSink
接收消息。
如果你想让它编译你必须确保它有returns信息,例如你可以把map
改成flatMapConcat
。这将确保从流返回的消息连接到新的消息流中。