从 socko 转移到 akka-http websockets
Moving from socko to akka-http websockets
我有一个基于 socko websockets 的现有 akka 应用程序。与套接字的通信发生在单个 actor 内部,离开和进入 actor 的消息(分别是传入和传出消息)都标有 socket id,这是 socko 的第一个 class 属性 websocket(在 socko 中,连接请求到达时标有 id,所有生命周期转换(如握手、断开连接、传入帧等)都带有类似的标签)
我想使用 akka-http 重新实现这个单一的 actor(socko 现在是 more-or-less 废弃软件,原因很明显)但这并不简单,因为这两个库在概念上非常不同; akka-http 隐藏了握手、断开连接等较低级别的细节,只需向绑定到 http 服务器的任何参与者发送一个 UpgradeToWebsocket 请求 header。 header object 包含一个方法,该方法采用物化流作为与客户端交换的所有消息的处理程序。
到目前为止,还不错;我能够在网络套接字上接收消息并直接回复它们。官方示例都采用某种无状态 request-reply 模型,因此我正在努力了解如何进行下一步,为物化流分配标签,管理其生命周期和连接状态(我需要通知其他人当客户端断开连接时应用程序中的参与者,以及标记消息。)
备选方案(使用 akka-streams 重塑整个应用程序)的工作量太大,因此非常感谢任何有关如何跟踪套接字的建议。
要与现有的基于角色的系统交互,您应该查看 Source.actorRef
and Sink.actorRef
。 Source.actorRef
创建一个您可以向其发送消息的 ActorRef,并且 Sink.actorRef
允许您使用 actor 处理传入消息并检测 websocket 的关闭。
要将 Source.actorRef
创建的 actor 连接到现有的长寿命 actor,请使用 Flow#mapMaterializedValue
。这也是为套接字连接分配唯一 ID 的好地方。
这 可能会让您入门。
需要注意的一件事。当使用协议栈中的 websocket close message. There is an issue open to implement this, but until it is implemented you have to do this yourself. For example by having something like this 关闭客户端到服务器的流程时,当前的 websocket 实现 不会 关闭服务器到客户端的流程。
Rüdiger Klaehn 的回答是一个有用的起点,谢谢!
最后我在阅读了这里的另一个问题 () 后选择了 ActorPublisher
。
关键是 Flow 在 akka-http 的某个地方 'materialized',所以你需要传递给 UpgradeToWebSocket.handleMessagesWithSinkSource
一个已经知道一个 Source/Sink 的对现有演员。所以我创建了一个 actor(它实现了 ActorPublisher[TextMessage.Strict]
),然后将它包装在 Source.fromPublisher(ActorPublisher(myActor))
中。
当你想从 actor 的 receive
方法向流中注入消息时,你首先检查是否 totalDemand > 0
(即流是否愿意接受输入),如果是,调用 onNext
与消息的内容。
我有一个基于 socko websockets 的现有 akka 应用程序。与套接字的通信发生在单个 actor 内部,离开和进入 actor 的消息(分别是传入和传出消息)都标有 socket id,这是 socko 的第一个 class 属性 websocket(在 socko 中,连接请求到达时标有 id,所有生命周期转换(如握手、断开连接、传入帧等)都带有类似的标签)
我想使用 akka-http 重新实现这个单一的 actor(socko 现在是 more-or-less 废弃软件,原因很明显)但这并不简单,因为这两个库在概念上非常不同; akka-http 隐藏了握手、断开连接等较低级别的细节,只需向绑定到 http 服务器的任何参与者发送一个 UpgradeToWebsocket 请求 header。 header object 包含一个方法,该方法采用物化流作为与客户端交换的所有消息的处理程序。
到目前为止,还不错;我能够在网络套接字上接收消息并直接回复它们。官方示例都采用某种无状态 request-reply 模型,因此我正在努力了解如何进行下一步,为物化流分配标签,管理其生命周期和连接状态(我需要通知其他人当客户端断开连接时应用程序中的参与者,以及标记消息。)
备选方案(使用 akka-streams 重塑整个应用程序)的工作量太大,因此非常感谢任何有关如何跟踪套接字的建议。
要与现有的基于角色的系统交互,您应该查看 Source.actorRef
and Sink.actorRef
。 Source.actorRef
创建一个您可以向其发送消息的 ActorRef,并且 Sink.actorRef
允许您使用 actor 处理传入消息并检测 websocket 的关闭。
要将 Source.actorRef
创建的 actor 连接到现有的长寿命 actor,请使用 Flow#mapMaterializedValue
。这也是为套接字连接分配唯一 ID 的好地方。
这
需要注意的一件事。当使用协议栈中的 websocket close message. There is an issue open to implement this, but until it is implemented you have to do this yourself. For example by having something like this 关闭客户端到服务器的流程时,当前的 websocket 实现 不会 关闭服务器到客户端的流程。
Rüdiger Klaehn 的回答是一个有用的起点,谢谢!
最后我在阅读了这里的另一个问题 () 后选择了 ActorPublisher
。
关键是 Flow 在 akka-http 的某个地方 'materialized',所以你需要传递给 UpgradeToWebSocket.handleMessagesWithSinkSource
一个已经知道一个 Source/Sink 的对现有演员。所以我创建了一个 actor(它实现了 ActorPublisher[TextMessage.Strict]
),然后将它包装在 Source.fromPublisher(ActorPublisher(myActor))
中。
当你想从 actor 的 receive
方法向流中注入消息时,你首先检查是否 totalDemand > 0
(即流是否愿意接受输入),如果是,调用 onNext
与消息的内容。