如何在断开连接后清理 akka-http websocket 资源然后重试?
How to clean up akka-http websocket resources following disconnection and then retry?
下面的代码成功建立了一个websocket连接。
websockets 服务器(也称为 akk-http)使用 故意关闭连接。
下面的 SinkActor
收到一条 akka.actor.Status.Failure
类型的消息,所以我知道从服务器到客户端的消息流已经中断。
我的问题是...我的客户端应该如何重新建立 websocket 连接? source.via(webSocketFlow).to(sink).run()
完成了吗?
清理资源和重试 websocket 连接的最佳做法是什么?
class ConnectionAdminActor extends Actor with ActorLogging {
implicit val system: ActorSystem = context.system
implicit val flowMaterializer = ActorMaterializer()
private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")
private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)
private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {
ref => {
self ! StartupWithActor(ref.path)
ref
}
}
private val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))
source
.via(webSocketFlow)
.to(sink)
.run()
试试 recoverWithRetries
组合器(文档 here)。
这允许您提供一个替代 Source
您的管道将切换到,以防上游发生故障。在最简单的情况下,您可以只 re-use 相同 Source
,这应该会发出一个新连接。
val wsSource = source via webSocketFlow
wsSource
.recoverWithRetries(attempts = -1, {case e: Throwable => wsSource})
.to(sink)
注意
attempts = -1
将无限期地重试重新连接
- 部分功能允许更精细地控制哪些异常可以触发重新连接
下面的代码成功建立了一个websocket连接。
websockets 服务器(也称为 akk-http)使用
下面的 SinkActor
收到一条 akka.actor.Status.Failure
类型的消息,所以我知道从服务器到客户端的消息流已经中断。
我的问题是...我的客户端应该如何重新建立 websocket 连接? source.via(webSocketFlow).to(sink).run()
完成了吗?
清理资源和重试 websocket 连接的最佳做法是什么?
class ConnectionAdminActor extends Actor with ActorLogging {
implicit val system: ActorSystem = context.system
implicit val flowMaterializer = ActorMaterializer()
private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")
private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)
private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {
ref => {
self ! StartupWithActor(ref.path)
ref
}
}
private val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))
source
.via(webSocketFlow)
.to(sink)
.run()
试试 recoverWithRetries
组合器(文档 here)。
这允许您提供一个替代 Source
您的管道将切换到,以防上游发生故障。在最简单的情况下,您可以只 re-use 相同 Source
,这应该会发出一个新连接。
val wsSource = source via webSocketFlow
wsSource
.recoverWithRetries(attempts = -1, {case e: Throwable => wsSource})
.to(sink)
注意
attempts = -1
将无限期地重试重新连接- 部分功能允许更精细地控制哪些异常可以触发重新连接