如何为 Akka http websockets 添加错误流

How to add an error flow for Akka http websockets

我一直在用头撞墙很长一段时间,因为我不知道如何为 akka http websocket 流添加错误流。我想要实现的是:

如果没有错误处理,这很容易,但我不知道如何添加错误。这是我拥有的:

type GameDecodeResult =
  Either[(String, io.circe.Error), GameLobby.LobbyRequest]

val errorFlow =
  Flow[GameDecodeResult]
    .mapConcat {
      case Left(err) => err :: Nil
      case Right(_) => Nil
    }
    .map { case (message, error) =>
      logger.info(s"failed to parse message $message", error)
      TextMessage(Error(error.toString).asJson.spaces2)
    }

val normalFlow = {
  val normalFlowSink =
    Flow[GameDecodeResult]
      .mapConcat {
        case Right(msg) => msg :: Nil
        case Left(_) => Nil
      }
      .map(req => GameLobby.IncomingMessage(userId, req))
      .to(Sink.actorRef[GameLobby.IncomingMessage](gameLobby, PoisonPill))

  val normalFlowSource: Source[Message, NotUsed] =
    Source.actorRef[GameLobby.OutgoingMessage](10, OverflowStrategy.fail)
      .mapMaterializedValue { outActor =>
        gameLobby ! GameLobby.UserConnected(userId, outActor)
        NotUsed
      }
      .map(outMessage => TextMessage(Ok(outMessage.message).asJson.spaces2))

  Flow.fromSinkAndSource(normalFlowSink, normalFlowSource)
}

val incomingMessageParser =
  Flow[Message]
    .flatMapConcat {
      case tm: TextMessage =>
        tm.textStream
      case bm: BinaryMessage =>
        bm.dataStream.runWith(Sink.ignore)
        Source.empty }
    .map { message =>
      decode[GameLobby.LobbyRequest](message).left.map(err => message -> err)
    }

这些是我定义的流程,我认为这应该足够好了,但我不知道如何 assemble 它们并且 akka 流的复杂性 API 没有帮助。这是我尝试过的:

val x: Flow[Message, Message, NotUsed] =
  GraphDSL.create(incomingMessageParser, normalFlow, errorFlow)((_, _, _)) { implicit builder =>
    (incoming, normal, error) =>
    import GraphDSL.Implicits._

    val partitioner = builder.add(Partition[GameDecodeResult](2, {
      case Right(_) => 0
      case Left(_) => 1
    }))

    val merge = builder.add(Merge[Message](2))

    incoming.in ~> partitioner ~> normal ~> merge
                   partitioner ~> error ~> merge
  }

但诚然,我完全不知道 GraphDSL.create 是如何工作的,我可以在哪里使用 ~> 箭头,或者我在最后一部分中在做什么。它只是不会进行类型检查,错误消息对我一点帮助也没有。

在您使用 GraphDSL 构建的 Flow 中需要修复一些问题:

  1. 无需将 3 个子流传递给 GraphDSL.create 方法,因为这只需要自定义图形的物化值。您已经确定图表的物化值将是 NotUsed.

  2. 使用~>运算符连接incoming时,需要将其出口(.out)连接到分区阶段。

  3. 每个 GraphDSL 定义块都需要 return 图形的形状 - 即它的外部端口。您可以通过 returning 一个 FlowShape 来做到这一点,其中 incoming.in 作为输入,merge.out 作为输出。这些将定义您的自定义流程的蓝图。

  4. 因为最后你想获得一个 Flow,你错过了最后一次调用 create 来自你定义的图表。这次调用是Flow.fromGraph(...).

下面的代码示例:

  val x: Flow[Message, Message, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val partitioner = builder.add(Partition[GameDecodeResult](2, {
        case Right(_) => 0
        case Left(_) => 1
      }))

      val merge = builder.add(Merge[Message](2))
      val incoming = builder.add(incomingMessageParser)

      incoming.out ~> partitioner
                      partitioner ~> normalFlow ~> merge
                      partitioner ~> errorFlow ~> merge
      FlowShape(incoming.in, merge.out)
    })