用akka-http应答一个特定的客户端,也支持广播
Answer a specific client with akka-http and also support broadcast
我对使用 akka-http 库创建服务器有点迷茫。我需要建立的通讯如下:
- 有一台服务器和n个客户端(n < 5)
- 有时客户端向服务器发送命令,服务器evaluates/delegates命令并回答客户端
- 服务器不断向所有客户端广播消息
鉴于:
- 我的服务器需要管理通过 websocket
连接的多个 'sessions'
这是我的 websocket 端点:
path("socket") {
handleWebSocketMessages(listen())
}
这里是 listen()
方法:
// stores offers to broadcast to all clients
private var offers: List[TextMessage => Unit] = List()
def listen(): Flow[Message, Message, NotUsed] = {
val inbound: Sink[Message, Any] = Sink.foreach(m => /* handle the message */) // (*)
val outbound: Source[Message, SourceQueueWithComplete[Message]] =
Source.queue[Message](16, OverflowStrategy.fail)
Flow.fromSinkAndSourceMat(inbound, outbound)((_, outboundMat) => {
offers ::= outboundMat.offer
NotUsed
})
}
def sendText(text: String): Unit = {
for (connection <- offers) connection(TextMessage.Strict(text))
}
通过这种方法,我可以注册多个客户端并使用 sendText(text: String)
方法回答他们。但是,有一个大问题:在我评估了它的命令后,我如何只回答特定的客户。 (参见 (*)
)
[困扰我的另一件事是 offers
是一个 var,当以纯 FP 方式编程时这似乎是错误的,但如果其余的工作正常我可以接受]
编辑:
为了详细说明,我基本上需要能够实现如下所示的方法:
def onMessageReceived(m: Message, answer: TextMessage => Unit): Unit = {
val response: TextMessage = handleMessage(m)
answer(response)
}
但是我不知道在我的 websocket Flow 中在哪里调用这个方法。
我不确定这是否可行,但这似乎有效:
var actors: List[ActorRef] = Nil
private def wsFlow(implicit materializer: ActorMaterializer): Flow[ws.Message, ws.Message, NotUsed] = {
val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
actors = actor :: actors
val wsHandler: Flow[ws.Message, ws.Message, NotUsed] =
Flow[ws.Message]
.merge(source)
.map {
case TextMessage.Strict(tm) => handleMessage(actor, tm)
case _ => TextMessage.Strict("Ignored message!")
}
wsHandler
}
def broadcast(msg: String): Unit = {
actors.foreach(_ ! TextMessage.Strict(msg))
}
我对使用 akka-http 库创建服务器有点迷茫。我需要建立的通讯如下:
- 有一台服务器和n个客户端(n < 5)
- 有时客户端向服务器发送命令,服务器evaluates/delegates命令并回答客户端
- 服务器不断向所有客户端广播消息
鉴于:
- 我的服务器需要管理通过 websocket 连接的多个 'sessions'
这是我的 websocket 端点:
path("socket") {
handleWebSocketMessages(listen())
}
这里是 listen()
方法:
// stores offers to broadcast to all clients
private var offers: List[TextMessage => Unit] = List()
def listen(): Flow[Message, Message, NotUsed] = {
val inbound: Sink[Message, Any] = Sink.foreach(m => /* handle the message */) // (*)
val outbound: Source[Message, SourceQueueWithComplete[Message]] =
Source.queue[Message](16, OverflowStrategy.fail)
Flow.fromSinkAndSourceMat(inbound, outbound)((_, outboundMat) => {
offers ::= outboundMat.offer
NotUsed
})
}
def sendText(text: String): Unit = {
for (connection <- offers) connection(TextMessage.Strict(text))
}
通过这种方法,我可以注册多个客户端并使用 sendText(text: String)
方法回答他们。但是,有一个大问题:在我评估了它的命令后,我如何只回答特定的客户。 (参见 (*)
)
[困扰我的另一件事是 offers
是一个 var,当以纯 FP 方式编程时这似乎是错误的,但如果其余的工作正常我可以接受]
编辑:
为了详细说明,我基本上需要能够实现如下所示的方法:
def onMessageReceived(m: Message, answer: TextMessage => Unit): Unit = {
val response: TextMessage = handleMessage(m)
answer(response)
}
但是我不知道在我的 websocket Flow 中在哪里调用这个方法。
我不确定这是否可行,但这似乎有效:
var actors: List[ActorRef] = Nil
private def wsFlow(implicit materializer: ActorMaterializer): Flow[ws.Message, ws.Message, NotUsed] = {
val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
actors = actor :: actors
val wsHandler: Flow[ws.Message, ws.Message, NotUsed] =
Flow[ws.Message]
.merge(source)
.map {
case TextMessage.Strict(tm) => handleMessage(actor, tm)
case _ => TextMessage.Strict("Ignored message!")
}
wsHandler
}
def broadcast(msg: String): Unit = {
actors.foreach(_ ! TextMessage.Strict(msg))
}