watchTermination 不会在 akka-http 流程中触发
watchTermination is not triggered in an akka-http flow
我目前正在尝试建立一个 akka-http websocket 连接,它可以:
- 向所有连接的客户端广播
- 回答客户的某些请求
到目前为止,我是这样创建流程的:
// keeps a list of all actors so I can broadcast to them
var actors: List[ActorRef] = Nil
private def wsFlow(implicit materializer: ActorMaterializer): Flow[ws.Message, ws.Message, NotUsed] = {
val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
// this never triggers
source.watchTermination() { (m, f) =>
f.onComplete(r => println("TERMINATION: " + r.toString))
actors = actors diff actor :: Nil
m
}
actors = actor :: actors
val wsHandler: Flow[ws.Message, ws.Message, NotUsed] =
Flow[ws.Message]
.merge(source)
.map {
case TextMessage.Strict(tm) => handleMessage(actor, tm)
case _ => TextMessage.Strict("Ignored message!")
}
wsHandler
}
def broadcast(msg: String): Unit = {
actors.foreach(_ ! TextMessage.Strict(msg))
}
我遇到的 - 希望 - 最后一个问题是 watchTermination
回调永远不会触发(我从来没有在我的控制台上收到 "TERMINATION: ..." 消息)。这是为什么?以及如何检测客户何时离开(这样我就可以将他从我的actors
列表中删除)?
我知道怎么做了:
val wsHandler: Flow[ws.Message, ws.Message, NotUsed] = Flow[ws.Message]
.watchTermination() { (m, f) =>
f.onComplete(r => {
println("Client left: " + r.toString)
actors = actors diff actor :: Nil
}
)
m
}
.merge(source)
.map {
case TextMessage.Strict(tm) => handleMessage(actor, tm)
case _ => TextMessage.Strict("Ignored message!")
}
我目前正在尝试建立一个 akka-http websocket 连接,它可以:
- 向所有连接的客户端广播
- 回答客户的某些请求
到目前为止,我是这样创建流程的:
// keeps a list of all actors so I can broadcast to them
var actors: List[ActorRef] = Nil
private def wsFlow(implicit materializer: ActorMaterializer): Flow[ws.Message, ws.Message, NotUsed] = {
val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
// this never triggers
source.watchTermination() { (m, f) =>
f.onComplete(r => println("TERMINATION: " + r.toString))
actors = actors diff actor :: Nil
m
}
actors = actor :: actors
val wsHandler: Flow[ws.Message, ws.Message, NotUsed] =
Flow[ws.Message]
.merge(source)
.map {
case TextMessage.Strict(tm) => handleMessage(actor, tm)
case _ => TextMessage.Strict("Ignored message!")
}
wsHandler
}
def broadcast(msg: String): Unit = {
actors.foreach(_ ! TextMessage.Strict(msg))
}
我遇到的 - 希望 - 最后一个问题是 watchTermination
回调永远不会触发(我从来没有在我的控制台上收到 "TERMINATION: ..." 消息)。这是为什么?以及如何检测客户何时离开(这样我就可以将他从我的actors
列表中删除)?
我知道怎么做了:
val wsHandler: Flow[ws.Message, ws.Message, NotUsed] = Flow[ws.Message]
.watchTermination() { (m, f) =>
f.onComplete(r => {
println("Client left: " + r.toString)
actors = actors diff actor :: Nil
}
)
m
}
.merge(source)
.map {
case TextMessage.Strict(tm) => handleMessage(actor, tm)
case _ => TextMessage.Strict("Ignored message!")
}