为什么 Play 框架不关闭 Akka Stream?

Why doesn't Play framework close the Akka Stream?

一个 actor 初始化一个连接到 websocket 的 Akka 流。这是通过使用 Source.actorRef 来完成的,消息可以发送到该消息,然后由 webSocketClientFlow 处理并由 Sink.foreach 消费。这可以在以下代码中看到(源自 akka docs):

class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging {

  final implicit val system: ActorSystem = ActorSystem()
  final implicit val materializer: ActorMaterializer = ActorMaterializer()

  def receive = {
    case _ =>
  }

  // Consume the incoming messages from the websocket.
  val incoming: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println(message.text)
    case misc => println(misc)
  }

  // Source through which we can send messages to the websocket.
  val outgoing: Source[TextMessage, ActorRef] =
  Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)

  // flow to use (note: not re-usable!)
  val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com"))

  // Materialized the stream
  val ((ws,upgradeResponse), closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.both)
    .toMat(incoming)(Keep.both) // also keep the Future[Done]
    .run()

  // Check whether the server has accepted the websocket request.
  val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(s"Failed: ${upgrade.response.status}")
    }
  }

  // When the connection has been established.
  connected.onComplete(println)

  // When the stream has closed
  closed.onComplete {
    case Success(_) => println("Test Websocket closed gracefully")
    case Failure(e) => log.error("Test Websocket closed with an error\n", e)
  }

}

当播放框架重新编译时,它会关闭 TestActor 但不会关闭 Akka 流。只有当 websocket 超时时,流才会关闭。

这是否意味着我需要手动关闭流,例如,在 TestActor 中发送使用 Source.actorRef 创建的 actor a PoisonPill PostStop函数?

注意:我还尝试注入 MaterializerActorsystem 即:

@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem)

Play重新编译时,流被关闭,但也产生错误:

[error] a.a.ActorSystemImpl - Websocket handler failed with
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]] 
terminated abruptly

在你的第一个例子中,你在你的演员中创建了一个演员系统。你不应该那样做 - 演员系统很昂贵,创建一个意味着启动线程池,启动调度程序等。另外,你永远不会关闭它,这意味着你遇到的问题比不关闭流要大得多- 你有资源泄漏,actor 系统创建的线程池永远不会关闭。

所以基本上,每次您收到 WebSocket 连接时,您都在创建一个带有一组新线程池的新参与者系统,并且您永远不会关闭它们。在生产环境中,即使负载很小(每秒几个请求),您的应用程序也会在几分钟内 运行 内存不足。

一般来说,在 Play 中,您永远不应该创建自己的 actor 系统,而应该注入一个。在 actor 中,您甚至不需要注入它,因为它会自动 - context.system 让您可以访问创建该 actor 的 actor 系统。与物化器类似,它们不是很重,但如果你为每个连接创建一个,如果你不关闭它,你也可能 运行 内存不足,所以你应该注入它。

所以当你注入它时,你会得到一个错误——这很难避免,但并非不可能。困难在于,Akka 本身并不能真正自动知道为了优雅地关闭事物需要关闭什么顺序,它应该先关闭你的演员,以便它可以优雅地关闭流,还是应该关闭流下来,以便他们可以通知您的演员他们已关闭并做出相应回应?

Akka 2.5 对此有一个解决方案,一个托管关闭序列,您可以在其中注册要在 Actor 系统开始以某种随机顺序杀死事物之前关闭的事物:

https://doc.akka.io/docs/akka/2.5/scala/actors.html#coordinated-shutdown

您可以将它与 Akka 流结合使用 kill switches 在应用程序的其余部分关闭之前优雅地关闭您的流。

但一般来说,关机错误是相当温和的,所以如果是我,我不会担心它们。