Akka HTTP 客户端 websocket 流的定义
Definition of Akka HTTP client-side websocket streams
我过去曾成功使用过 Akka Streams,但是,我目前很难理解为什么 Akka-HTTP 中的客户端 Websocket Streams 被定义并像 documentation.
由于 WebSocket 连接允许全双工通信,我希望这种连接由 Akka HTTP 中的两个独立流表示,一个用于传入流量,一个用于传出流量。事实上,文档说明如下:
A WebSocket consists of two streams of messages [...]
它进一步指出传入消息由 Sink
表示,传出消息由 Source
表示。这是我的第一个困惑点——当使用两个独立的流时,您会期望总共处理两个源和两个汇,而不是每种都处理一个。目前,我的猜测是传入流的来源以及传出流的接收器对开发人员来说并没有多大用处,因此只是 "hidden".
但是,将所有内容连接在一起时确实会让人感到困惑(请参阅上面链接的文档)。
使用singleWebSocketRequest
时出现问题的部分:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
或使用webSocketClientFlow
时相同的部分:
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
这与我目前对流的工作流程的理解相矛盾。
- 为什么我要将传出消息的
Source
和传入消息的 Sink
结合起来?上面的代码看起来像是我在向自己而不是服务器发送消息。
- 此外,
Flow[Message, Message, ...]
的语义是什么?将传出消息转换为传入消息似乎没有意义。
- 不应该有两个流而不是一个吗?
感谢任何帮助我提高理解力的人,谢谢。
编辑:
我使用 Source
和 Sink
并通过 WebSocket 发送数据没有问题,我只是想了解为什么阶段的连接是这样完成的。
WebSocket 确实由两个独立的流组成,只是这些流(可能)不在同一个 JVM 上。
您有两个对等点进行通信,一个是服务器,另一个是客户端,但是从已建立的 WebSocket 连接的角度来看,差异不再重要。一个数据流是对等点 1 向对等点 2 发送消息,另一个数据流是对等点 2 向对等点 1 发送消息,然后这两个对等点之间存在网络边界。如果您一次查看一个对等点,您会看到对等点 1 从对等点 2 接收消息,并且在第二个流中对等点 1 正在向对等点 2 发送消息。
每个peer都有一个接收端的Sink和一个发送端的Source。您实际上总共有两个 Sources 和两个 Sinks,只是不是都在同一个 ActorSystem 上(为了解释起见,假设两个对等点都是在 Akka HTTP 中实现的)。 peer 1 的 Source 连接到 peer 2 的 Sink,peer 2 的 Source 连接到 peer 1 的 Sink。
因此,您编写了一个描述如何通过第一个流处理传入消息的接收器和一个描述如何通过第二个流发送消息的源。通常,您希望根据收到的消息生成消息,因此您可以将这两者连接在一起,并通过不同的流路由消息,这些流描述了如何对传入消息做出反应,并生成任意数量的传出消息。 Flow[Message, Message, _]
并不意味着您要将传出消息转换为传入消息,而是将传入消息转换为传出消息。
webSocketFlow
是典型的异步边界,代表对方的流。它是 "transforming" 传出消息到传入消息,方法是将它们发送到另一个对等点并发出另一个对等点发送的任何内容。
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
此流是您的同行的两个流的一半:
[message from other peer]
连接到 printSink
helloSource
连接到 [message to the other peer]
传入消息和传出消息之间没有关系,您只需打印收到的所有内容并发送一个 "hello world!"。实际上,由于源在一条消息后完成,WebSocket 连接也会关闭,但是如果您将源替换为例如 Source.repeat
,您将不断地发送(泛洪,真的)"hello, world!"电汇,无论传入消息的速率如何。
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
在这里,您获取来自 outgoing
的所有内容,即您要发送的消息,通过 webSocketFlow
路由它,"transforms" 通过与其他对等方通信的消息并将收到的每条消息生成 incoming
。通常你有一个有线协议,你可以在其中将你的案例 class/pojo/dto 消息编码和解码为有线格式。
val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???
val upgradeResponse = outgoing
.via(encode)
.viaMat(webSocketFlow)(Keep.right)
.via(decode)
.to(incoming)
.run()
或者您可以想象某种聊天服务器(啊,websockets 和聊天),它广播和合并来自和向许多客户端发送的消息。这应该从任何客户端获取任何消息并将其发送到每个客户端(仅用于演示,未经测试,可能不是您想要的实际聊天服务器):
val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???
// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
(chatClientReceivers, chatClientSenders).zipped.map(
(outgoingSendToClient, incomingFromClient) =>
Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))
val toClients: Graph[SinkShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))
(broadcast.outArray, chatClientReceivers).zipped
.foreach((bcOut, client) => bcOut ~> b.add(client).in)
SinkShape(broadcast.in)
}
val fromClients: Graph[SourceShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Message](chatClientSenders.size))
(merge.inSeq, chatClientSenders).zipped
.foreach((mIn, client) => b.add(client).out ~> mIn)
SourceShape(merge.out)
}
val upgradeResponse: Future[WebSocketUpgradeResponse] =
Source.fromGraph(fromClients)
.viaMat(webSocketFlow)(Keep.right)
.to(toClients)
.run()
希望对您有所帮助。
我过去曾成功使用过 Akka Streams,但是,我目前很难理解为什么 Akka-HTTP 中的客户端 Websocket Streams 被定义并像 documentation.
由于 WebSocket 连接允许全双工通信,我希望这种连接由 Akka HTTP 中的两个独立流表示,一个用于传入流量,一个用于传出流量。事实上,文档说明如下:
A WebSocket consists of two streams of messages [...]
它进一步指出传入消息由 Sink
表示,传出消息由 Source
表示。这是我的第一个困惑点——当使用两个独立的流时,您会期望总共处理两个源和两个汇,而不是每种都处理一个。目前,我的猜测是传入流的来源以及传出流的接收器对开发人员来说并没有多大用处,因此只是 "hidden".
但是,将所有内容连接在一起时确实会让人感到困惑(请参阅上面链接的文档)。
使用singleWebSocketRequest
时出现问题的部分:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
或使用webSocketClientFlow
时相同的部分:
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
这与我目前对流的工作流程的理解相矛盾。
- 为什么我要将传出消息的
Source
和传入消息的Sink
结合起来?上面的代码看起来像是我在向自己而不是服务器发送消息。 - 此外,
Flow[Message, Message, ...]
的语义是什么?将传出消息转换为传入消息似乎没有意义。 - 不应该有两个流而不是一个吗?
感谢任何帮助我提高理解力的人,谢谢。
编辑:
我使用 Source
和 Sink
并通过 WebSocket 发送数据没有问题,我只是想了解为什么阶段的连接是这样完成的。
WebSocket 确实由两个独立的流组成,只是这些流(可能)不在同一个 JVM 上。
您有两个对等点进行通信,一个是服务器,另一个是客户端,但是从已建立的 WebSocket 连接的角度来看,差异不再重要。一个数据流是对等点 1 向对等点 2 发送消息,另一个数据流是对等点 2 向对等点 1 发送消息,然后这两个对等点之间存在网络边界。如果您一次查看一个对等点,您会看到对等点 1 从对等点 2 接收消息,并且在第二个流中对等点 1 正在向对等点 2 发送消息。
每个peer都有一个接收端的Sink和一个发送端的Source。您实际上总共有两个 Sources 和两个 Sinks,只是不是都在同一个 ActorSystem 上(为了解释起见,假设两个对等点都是在 Akka HTTP 中实现的)。 peer 1 的 Source 连接到 peer 2 的 Sink,peer 2 的 Source 连接到 peer 1 的 Sink。
因此,您编写了一个描述如何通过第一个流处理传入消息的接收器和一个描述如何通过第二个流发送消息的源。通常,您希望根据收到的消息生成消息,因此您可以将这两者连接在一起,并通过不同的流路由消息,这些流描述了如何对传入消息做出反应,并生成任意数量的传出消息。 Flow[Message, Message, _]
并不意味着您要将传出消息转换为传入消息,而是将传入消息转换为传出消息。
webSocketFlow
是典型的异步边界,代表对方的流。它是 "transforming" 传出消息到传入消息,方法是将它们发送到另一个对等点并发出另一个对等点发送的任何内容。
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
此流是您的同行的两个流的一半:
[message from other peer]
连接到printSink
helloSource
连接到[message to the other peer]
传入消息和传出消息之间没有关系,您只需打印收到的所有内容并发送一个 "hello world!"。实际上,由于源在一条消息后完成,WebSocket 连接也会关闭,但是如果您将源替换为例如 Source.repeat
,您将不断地发送(泛洪,真的)"hello, world!"电汇,无论传入消息的速率如何。
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
在这里,您获取来自 outgoing
的所有内容,即您要发送的消息,通过 webSocketFlow
路由它,"transforms" 通过与其他对等方通信的消息并将收到的每条消息生成 incoming
。通常你有一个有线协议,你可以在其中将你的案例 class/pojo/dto 消息编码和解码为有线格式。
val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???
val upgradeResponse = outgoing
.via(encode)
.viaMat(webSocketFlow)(Keep.right)
.via(decode)
.to(incoming)
.run()
或者您可以想象某种聊天服务器(啊,websockets 和聊天),它广播和合并来自和向许多客户端发送的消息。这应该从任何客户端获取任何消息并将其发送到每个客户端(仅用于演示,未经测试,可能不是您想要的实际聊天服务器):
val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???
// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
(chatClientReceivers, chatClientSenders).zipped.map(
(outgoingSendToClient, incomingFromClient) =>
Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))
val toClients: Graph[SinkShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))
(broadcast.outArray, chatClientReceivers).zipped
.foreach((bcOut, client) => bcOut ~> b.add(client).in)
SinkShape(broadcast.in)
}
val fromClients: Graph[SourceShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Message](chatClientSenders.size))
(merge.inSeq, chatClientSenders).zipped
.foreach((mIn, client) => b.add(client).out ~> mIn)
SourceShape(merge.out)
}
val upgradeResponse: Future[WebSocketUpgradeResponse] =
Source.fromGraph(fromClients)
.viaMat(webSocketFlow)(Keep.right)
.to(toClients)
.run()
希望对您有所帮助。