如何在断开连接后清理 akka-http websocket 资源然后重试?

How to clean up akka-http websocket resources following disconnection and then retry?

下面的代码成功建立了一个websocket连接。

websockets 服务器(也称为 akk-http)使用 故意关闭连接。

下面的 SinkActor 收到一条 akka.actor.Status.Failure 类型的消息,所以我知道从服务器到客户端的消息流已经中断。

我的问题是...我的客户端应该如何重新建立 websocket 连接? source.via(webSocketFlow).to(sink).run() 完成了吗?

清理资源和重试 websocket 连接的最佳做法是什么?

class ConnectionAdminActor extends Actor with ActorLogging {
  implicit val system: ActorSystem = context.system
  implicit val flowMaterializer    = ActorMaterializer()

  private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")

  private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)

  private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {
    ref => {
      self ! StartupWithActor(ref.path)
      ref
    }
  }

  private val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))

  source
    .via(webSocketFlow)
    .to(sink)
    .run()

试试 recoverWithRetries 组合器(文档 here)。

这允许您提供一个替代 Source 您的管道将切换到,以防上游发生故障。在最简单的情况下,您可以只 re-use 相同 Source,这应该会发出一个新连接。

val wsSource = source via webSocketFlow

wsSource
  .recoverWithRetries(attempts = -1, {case e: Throwable => wsSource})
  .to(sink)

注意

  • attempts = -1 将无限期地重试重新连接
  • 部分功能允许更精细地控制哪些异常可以触发重新连接