Akka websocket 客户端:主动断开与服务器的连接和/或替换接收器
Akka websocket client : actively disconnect from server and / or replace sink
我开始学习 Akka 并遇到了一个挑战,尽管我已经仔细阅读了文档和相关的 Stakoverflow 问题,但我仍然找不到简单的解决方案:
基于 Akka 网站上的 Client-Side Websocket Support 示例,我在 Scala 中使用以下代码片段作为基础:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, Source.maybe)(Keep.left)
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://localhost/ws"), flow)
我的用例是客户端 (printSink) 从 websocket 服务器消耗连续流。仅单向通信,因此不需要源。
那么我的问题如下:
- 我需要定期强制重新连接到 websocket 服务器,为此我需要先断开连接。但是对于我来说,我找不到一种方法来进行简单的断开连接
- 在相反的情况下,我需要保持 websocket 连接处于活动状态并“换出”接收器。这甚至可能吗,即不创建另一个 websocket 连接?
对于问题 1(强制断开与客户端的连接),这应该可行
val flow: Flow[Message, Message, (Future[Done], Promise[Option[Message])] =
Flow.fromSinkAndSourceMat(
printSink,
Source.maybe
)(Keep.both)
val (upgradeResponse, (closed, disconnect)) =
Http().singleWebsocketRequest(WebSocketRequest("ws://localhost/ws"), flow)
disconnect
然后可以用 None
完成断开连接:
disconnect.success(None)
对于问题 2,我的直觉是那种动态流操作似乎需要 custom stream operator(即比 Graph DSL 低一级,比“正常”低两级 scaladsl
/javadsl
)。老实说,我在那里没有大量的直接经验。
我开始学习 Akka 并遇到了一个挑战,尽管我已经仔细阅读了文档和相关的 Stakoverflow 问题,但我仍然找不到简单的解决方案:
基于 Akka 网站上的 Client-Side Websocket Support 示例,我在 Scala 中使用以下代码片段作为基础:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, Source.maybe)(Keep.left)
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://localhost/ws"), flow)
我的用例是客户端 (printSink) 从 websocket 服务器消耗连续流。仅单向通信,因此不需要源。
那么我的问题如下:
- 我需要定期强制重新连接到 websocket 服务器,为此我需要先断开连接。但是对于我来说,我找不到一种方法来进行简单的断开连接
- 在相反的情况下,我需要保持 websocket 连接处于活动状态并“换出”接收器。这甚至可能吗,即不创建另一个 websocket 连接?
对于问题 1(强制断开与客户端的连接),这应该可行
val flow: Flow[Message, Message, (Future[Done], Promise[Option[Message])] =
Flow.fromSinkAndSourceMat(
printSink,
Source.maybe
)(Keep.both)
val (upgradeResponse, (closed, disconnect)) =
Http().singleWebsocketRequest(WebSocketRequest("ws://localhost/ws"), flow)
disconnect
然后可以用 None
完成断开连接:
disconnect.success(None)
对于问题 2,我的直觉是那种动态流操作似乎需要 custom stream operator(即比 Graph DSL 低一级,比“正常”低两级 scaladsl
/javadsl
)。老实说,我在那里没有大量的直接经验。