Akka 在停止 actor 之前等待 Ack

Akka wait for Ack before stopping actor

我将 Plays play.api.mvc.WebSockets 与 akka.contrib.pattern.DistributedPubSub 事件结合使用,这已经很好用了。

class SomeSocketActor(out: ActorRef) extends Actor {
    val mediator = DistributedPubSubExtension(context.system).mediator
    mediator ! Subscribe("some_group", self)

    def receive: Actor.Receive = {
      case SubscribeAck(Subscribe("some_group", None, `self`)) =>
        context become ready
    }

    def ready: Actor.Receive = {
      // ...
    }

    override def postStop(): Unit = {
      mediator ! Unsubscribe("some_group", self)
    }
}

一旦套接字关闭,它就会发送 Unsubscribe。一旦 UnsubscribeDistributedPubSubMediator 接收到,它就会用 UnsubscribeAck 回复。然而,因为在那一刻(在 postStop 之后)actor 已经停止并且 UnsubscribeAck 被移动了 Akkas 死信邮箱并且我的日志中充斥着类似以下内容的垃圾邮件:

Message [akka.contrib.pattern.DistributedPubSubMediator$UnsubscribeAck] from Actor[akka://application/user/distributedPubSubMediator#261175455] to Actor[akka://application/system/websockets/24/handler#325798268] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

我知道我可以按照日志消息中的建议进行操作,但这似乎不是一个好的做法。有没有办法告诉 postStop 方法中的演员在停止前等待 UnsubscribeAck

不在postStop方法中,太晚了。

但是,与其在收到 UnsubscribeAck 之前停止,不如 become 等待 UnsubscribeAck 并在收到它时停止(或者在超时后停止,以防 UnsubscribeAck 从未收到。

这实际上应该可以正常工作,因为 DistributedPubSubMediator 会在您订阅时监视您的演员 Terminated。在一个新项目中,它确实按预期工作,调解员为您退订,因此您可能不需要做任何事情。我保留这个答案,因为它在我的项目中没有这样做,而且它可能会影响其他人。

我正在做的是编写第二个长期存在的 actor(您可以在控制器中创建它),并让 WebSocket actor 与其对话以管理订阅。这样新的 actor 可以传递取消订阅消息,然后它可以接收并忽略 UnsubscribeAck.