为什么 Play 框架不关闭 Akka Stream?
Why doesn't Play framework close the Akka Stream?
一个 actor 初始化一个连接到 websocket 的 Akka 流。这是通过使用 Source.actorRef
来完成的,消息可以发送到该消息,然后由 webSocketClientFlow
处理并由 Sink.foreach
消费。这可以在以下代码中看到(源自 akka docs):
class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging {
final implicit val system: ActorSystem = ActorSystem()
final implicit val materializer: ActorMaterializer = ActorMaterializer()
def receive = {
case _ =>
}
// Consume the incoming messages from the websocket.
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case misc => println(misc)
}
// Source through which we can send messages to the websocket.
val outgoing: Source[TextMessage, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com"))
// Materialized the stream
val ((ws,upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// Check whether the server has accepted the websocket request.
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Failed: ${upgrade.response.status}")
}
}
// When the connection has been established.
connected.onComplete(println)
// When the stream has closed
closed.onComplete {
case Success(_) => println("Test Websocket closed gracefully")
case Failure(e) => log.error("Test Websocket closed with an error\n", e)
}
}
当播放框架重新编译时,它会关闭 TestActor 但不会关闭 Akka 流。只有当 websocket 超时时,流才会关闭。
这是否意味着我需要手动关闭流,例如,在 TestActor 中发送使用 Source.actorRef
创建的 actor a PoisonPill
PostStop
函数?
注意:我还尝试注入 Materializer
和 Actorsystem
即:
@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem)
Play重新编译时,流被关闭,但也产生错误:
[error] a.a.ActorSystemImpl - Websocket handler failed with
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]]
terminated abruptly
在你的第一个例子中,你在你的演员中创建了一个演员系统。你不应该那样做 - 演员系统很昂贵,创建一个意味着启动线程池,启动调度程序等。另外,你永远不会关闭它,这意味着你遇到的问题比不关闭流要大得多- 你有资源泄漏,actor 系统创建的线程池永远不会关闭。
所以基本上,每次您收到 WebSocket 连接时,您都在创建一个带有一组新线程池的新参与者系统,并且您永远不会关闭它们。在生产环境中,即使负载很小(每秒几个请求),您的应用程序也会在几分钟内 运行 内存不足。
一般来说,在 Play 中,您永远不应该创建自己的 actor 系统,而应该注入一个。在 actor 中,您甚至不需要注入它,因为它会自动 - context.system
让您可以访问创建该 actor 的 actor 系统。与物化器类似,它们不是很重,但如果你为每个连接创建一个,如果你不关闭它,你也可能 运行 内存不足,所以你应该注入它。
所以当你注入它时,你会得到一个错误——这很难避免,但并非不可能。困难在于,Akka 本身并不能真正自动知道为了优雅地关闭事物需要关闭什么顺序,它应该先关闭你的演员,以便它可以优雅地关闭流,还是应该关闭流下来,以便他们可以通知您的演员他们已关闭并做出相应回应?
Akka 2.5 对此有一个解决方案,一个托管关闭序列,您可以在其中注册要在 Actor 系统开始以某种随机顺序杀死事物之前关闭的事物:
https://doc.akka.io/docs/akka/2.5/scala/actors.html#coordinated-shutdown
您可以将它与 Akka 流结合使用 kill switches 在应用程序的其余部分关闭之前优雅地关闭您的流。
但一般来说,关机错误是相当温和的,所以如果是我,我不会担心它们。
一个 actor 初始化一个连接到 websocket 的 Akka 流。这是通过使用 Source.actorRef
来完成的,消息可以发送到该消息,然后由 webSocketClientFlow
处理并由 Sink.foreach
消费。这可以在以下代码中看到(源自 akka docs):
class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging {
final implicit val system: ActorSystem = ActorSystem()
final implicit val materializer: ActorMaterializer = ActorMaterializer()
def receive = {
case _ =>
}
// Consume the incoming messages from the websocket.
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case misc => println(misc)
}
// Source through which we can send messages to the websocket.
val outgoing: Source[TextMessage, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com"))
// Materialized the stream
val ((ws,upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// Check whether the server has accepted the websocket request.
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Failed: ${upgrade.response.status}")
}
}
// When the connection has been established.
connected.onComplete(println)
// When the stream has closed
closed.onComplete {
case Success(_) => println("Test Websocket closed gracefully")
case Failure(e) => log.error("Test Websocket closed with an error\n", e)
}
}
当播放框架重新编译时,它会关闭 TestActor 但不会关闭 Akka 流。只有当 websocket 超时时,流才会关闭。
这是否意味着我需要手动关闭流,例如,在 TestActor 中发送使用 Source.actorRef
创建的 actor a PoisonPill
PostStop
函数?
注意:我还尝试注入 Materializer
和 Actorsystem
即:
@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem)
Play重新编译时,流被关闭,但也产生错误:
[error] a.a.ActorSystemImpl - Websocket handler failed with
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]]
terminated abruptly
在你的第一个例子中,你在你的演员中创建了一个演员系统。你不应该那样做 - 演员系统很昂贵,创建一个意味着启动线程池,启动调度程序等。另外,你永远不会关闭它,这意味着你遇到的问题比不关闭流要大得多- 你有资源泄漏,actor 系统创建的线程池永远不会关闭。
所以基本上,每次您收到 WebSocket 连接时,您都在创建一个带有一组新线程池的新参与者系统,并且您永远不会关闭它们。在生产环境中,即使负载很小(每秒几个请求),您的应用程序也会在几分钟内 运行 内存不足。
一般来说,在 Play 中,您永远不应该创建自己的 actor 系统,而应该注入一个。在 actor 中,您甚至不需要注入它,因为它会自动 - context.system
让您可以访问创建该 actor 的 actor 系统。与物化器类似,它们不是很重,但如果你为每个连接创建一个,如果你不关闭它,你也可能 运行 内存不足,所以你应该注入它。
所以当你注入它时,你会得到一个错误——这很难避免,但并非不可能。困难在于,Akka 本身并不能真正自动知道为了优雅地关闭事物需要关闭什么顺序,它应该先关闭你的演员,以便它可以优雅地关闭流,还是应该关闭流下来,以便他们可以通知您的演员他们已关闭并做出相应回应?
Akka 2.5 对此有一个解决方案,一个托管关闭序列,您可以在其中注册要在 Actor 系统开始以某种随机顺序杀死事物之前关闭的事物:
https://doc.akka.io/docs/akka/2.5/scala/actors.html#coordinated-shutdown
您可以将它与 Akka 流结合使用 kill switches 在应用程序的其余部分关闭之前优雅地关闭您的流。
但一般来说,关机错误是相当温和的,所以如果是我,我不会担心它们。