如何为 Akka http websockets 添加错误流
How to add an error flow for Akka http websockets
我一直在用头撞墙很长一段时间,因为我不知道如何为 akka http websocket 流添加错误流。我想要实现的是:
- 消息来自 WS 客户端
- 它是用来自 json
的 circe 解析的
- 如果消息格式正确,将解析后的消息发送给参与者
- 如果消息格式错误return向客户端发送错误消息
- 演员可以另外向客户端发送消息
如果没有错误处理,这很容易,但我不知道如何添加错误。这是我拥有的:
type GameDecodeResult =
Either[(String, io.circe.Error), GameLobby.LobbyRequest]
val errorFlow =
Flow[GameDecodeResult]
.mapConcat {
case Left(err) => err :: Nil
case Right(_) => Nil
}
.map { case (message, error) =>
logger.info(s"failed to parse message $message", error)
TextMessage(Error(error.toString).asJson.spaces2)
}
val normalFlow = {
val normalFlowSink =
Flow[GameDecodeResult]
.mapConcat {
case Right(msg) => msg :: Nil
case Left(_) => Nil
}
.map(req => GameLobby.IncomingMessage(userId, req))
.to(Sink.actorRef[GameLobby.IncomingMessage](gameLobby, PoisonPill))
val normalFlowSource: Source[Message, NotUsed] =
Source.actorRef[GameLobby.OutgoingMessage](10, OverflowStrategy.fail)
.mapMaterializedValue { outActor =>
gameLobby ! GameLobby.UserConnected(userId, outActor)
NotUsed
}
.map(outMessage => TextMessage(Ok(outMessage.message).asJson.spaces2))
Flow.fromSinkAndSource(normalFlowSink, normalFlowSource)
}
val incomingMessageParser =
Flow[Message]
.flatMapConcat {
case tm: TextMessage =>
tm.textStream
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Source.empty }
.map { message =>
decode[GameLobby.LobbyRequest](message).left.map(err => message -> err)
}
这些是我定义的流程,我认为这应该足够好了,但我不知道如何 assemble 它们并且 akka 流的复杂性 API 没有帮助。这是我尝试过的:
val x: Flow[Message, Message, NotUsed] =
GraphDSL.create(incomingMessageParser, normalFlow, errorFlow)((_, _, _)) { implicit builder =>
(incoming, normal, error) =>
import GraphDSL.Implicits._
val partitioner = builder.add(Partition[GameDecodeResult](2, {
case Right(_) => 0
case Left(_) => 1
}))
val merge = builder.add(Merge[Message](2))
incoming.in ~> partitioner ~> normal ~> merge
partitioner ~> error ~> merge
}
但诚然,我完全不知道 GraphDSL.create
是如何工作的,我可以在哪里使用 ~>
箭头,或者我在最后一部分中在做什么。它只是不会进行类型检查,错误消息对我一点帮助也没有。
在您使用 GraphDSL 构建的 Flow 中需要修复一些问题:
无需将 3 个子流传递给 GraphDSL.create
方法,因为这只需要自定义图形的物化值。您已经确定图表的物化值将是 NotUsed
.
使用~>
运算符连接incoming
时,需要将其出口(.out
)连接到分区阶段。
每个 GraphDSL 定义块都需要 return 图形的形状 - 即它的外部端口。您可以通过 returning 一个 FlowShape
来做到这一点,其中 incoming.in
作为输入,merge.out
作为输出。这些将定义您的自定义流程的蓝图。
因为最后你想获得一个 Flow
,你错过了最后一次调用 create 来自你定义的图表。这次调用是Flow.fromGraph(...)
.
下面的代码示例:
val x: Flow[Message, Message, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partitioner = builder.add(Partition[GameDecodeResult](2, {
case Right(_) => 0
case Left(_) => 1
}))
val merge = builder.add(Merge[Message](2))
val incoming = builder.add(incomingMessageParser)
incoming.out ~> partitioner
partitioner ~> normalFlow ~> merge
partitioner ~> errorFlow ~> merge
FlowShape(incoming.in, merge.out)
})
我一直在用头撞墙很长一段时间,因为我不知道如何为 akka http websocket 流添加错误流。我想要实现的是:
- 消息来自 WS 客户端
- 它是用来自 json 的 circe 解析的
- 如果消息格式正确,将解析后的消息发送给参与者
- 如果消息格式错误return向客户端发送错误消息
- 演员可以另外向客户端发送消息
如果没有错误处理,这很容易,但我不知道如何添加错误。这是我拥有的:
type GameDecodeResult =
Either[(String, io.circe.Error), GameLobby.LobbyRequest]
val errorFlow =
Flow[GameDecodeResult]
.mapConcat {
case Left(err) => err :: Nil
case Right(_) => Nil
}
.map { case (message, error) =>
logger.info(s"failed to parse message $message", error)
TextMessage(Error(error.toString).asJson.spaces2)
}
val normalFlow = {
val normalFlowSink =
Flow[GameDecodeResult]
.mapConcat {
case Right(msg) => msg :: Nil
case Left(_) => Nil
}
.map(req => GameLobby.IncomingMessage(userId, req))
.to(Sink.actorRef[GameLobby.IncomingMessage](gameLobby, PoisonPill))
val normalFlowSource: Source[Message, NotUsed] =
Source.actorRef[GameLobby.OutgoingMessage](10, OverflowStrategy.fail)
.mapMaterializedValue { outActor =>
gameLobby ! GameLobby.UserConnected(userId, outActor)
NotUsed
}
.map(outMessage => TextMessage(Ok(outMessage.message).asJson.spaces2))
Flow.fromSinkAndSource(normalFlowSink, normalFlowSource)
}
val incomingMessageParser =
Flow[Message]
.flatMapConcat {
case tm: TextMessage =>
tm.textStream
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Source.empty }
.map { message =>
decode[GameLobby.LobbyRequest](message).left.map(err => message -> err)
}
这些是我定义的流程,我认为这应该足够好了,但我不知道如何 assemble 它们并且 akka 流的复杂性 API 没有帮助。这是我尝试过的:
val x: Flow[Message, Message, NotUsed] =
GraphDSL.create(incomingMessageParser, normalFlow, errorFlow)((_, _, _)) { implicit builder =>
(incoming, normal, error) =>
import GraphDSL.Implicits._
val partitioner = builder.add(Partition[GameDecodeResult](2, {
case Right(_) => 0
case Left(_) => 1
}))
val merge = builder.add(Merge[Message](2))
incoming.in ~> partitioner ~> normal ~> merge
partitioner ~> error ~> merge
}
但诚然,我完全不知道 GraphDSL.create
是如何工作的,我可以在哪里使用 ~>
箭头,或者我在最后一部分中在做什么。它只是不会进行类型检查,错误消息对我一点帮助也没有。
在您使用 GraphDSL 构建的 Flow 中需要修复一些问题:
无需将 3 个子流传递给
GraphDSL.create
方法,因为这只需要自定义图形的物化值。您已经确定图表的物化值将是NotUsed
.使用
~>
运算符连接incoming
时,需要将其出口(.out
)连接到分区阶段。每个 GraphDSL 定义块都需要 return 图形的形状 - 即它的外部端口。您可以通过 returning 一个
FlowShape
来做到这一点,其中incoming.in
作为输入,merge.out
作为输出。这些将定义您的自定义流程的蓝图。因为最后你想获得一个
Flow
,你错过了最后一次调用 create 来自你定义的图表。这次调用是Flow.fromGraph(...)
.
下面的代码示例:
val x: Flow[Message, Message, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partitioner = builder.add(Partition[GameDecodeResult](2, {
case Right(_) => 0
case Left(_) => 1
}))
val merge = builder.add(Merge[Message](2))
val incoming = builder.add(incomingMessageParser)
incoming.out ~> partitioner
partitioner ~> normalFlow ~> merge
partitioner ~> errorFlow ~> merge
FlowShape(incoming.in, merge.out)
})