让 Akka actor 等待外部输入而不挂起其他 actor

Getting Akka actor to wait for external inputs without hanging other actors

阿卡 (Java API) 这里。假设我有两个演员,WatchdogBlockingListener。他们都被注入了对彼此的引用:

// WARNING!
// All code here is just Groovy-pseudocode
class Watchdog extends UntypedActor {
    ActorRef blockingListener

    @Override
    void onReceive(Object message) {
        if(message in Init) {
            blockingListener.tell(new StartListening(), self())
        } else if(message in SomethingHappened) {
            ...
        }
    }
}

class BlockingListener extends UntypedActor {
    ActorRef watchdog

    @Override
    void onReceive(Object message) {
        if(message in StartListening) {
            while(true) {
                // Block and scan for input
                String event = waitForNextEvent()

                watchdog.tell(new SomethingHappened(event), self())
            }
        }
    }
}

我担心 BlockingListener 中的 while(true) 循环会妨碍两个演员之间的正常交流。 会吗? 我担心当 BlockingListener 被告知 StartListening 时,它会:

  1. 进入while(true)循环;然后
  2. 创建BlockingListener线程hang/wait直到一个事件(actor系统之外)发生;然后
  3. 尝试告诉Watchdog关于SomethingHappened(event)但是...
  4. Watchdog 永远不会收到 SomethingHappened 因为它仍在等待自己的 Init 消息完成处理

我说得对吗?如果是这样,这里的补救措施是什么?我们如何让 BlockingListener 监听 actor 系统之外的事件并响应它们?

ActorSystem包含一个默认调度器,它本质上是一个线程池,您可以配置自己的单独调度器,但如果您没有,将使用默认调度器。

当有人发送 WatchDog 消息时 Init 它将被调度到 运行 在调度程序的线程之一上,它将发送 StartListening 消息到blockingListener 然后 return 线程到调度程序(当接收方法完成时)。

然而,

BlockingListener 将在有人发送它时 StartListening 分配一个线程,它将永远保留并且永远不会 return 给调度程序。当它向 watchdog 发送消息时,如果 watchdog 能够对该消息作出反应,它将取决于可用线程的数量。

一般来说,Akka 是围绕非阻塞设计的,所以你应该尽可能避免这种情况,但在某些情况下,阻塞可能是不可避免的。

在这种情况下您应该做的是将这些 actor 隔离到一个单独的调度程序中(并且更喜欢基于线程池的调度程序而不是默认的 Fork Join Pool),这将确保您的 actor 不会消耗一个线程系统其他参与者的问题。

但是还有一个问题,即使你将 actor 隔离在它自己的线程池中,那是因为你的 actor 卡在了 while 循环中(一个 actor 只会同时在一个线程中执行时间)它不会对收到停止消息做出反应,甚至不会关闭 actor 系统 - actor 系统只会挂起。

这可以通过周期性地打破循环来解决,向 actor 发送一条消息以继续,然后将最后添加到 actor 邮箱中并允许它处理其他消息并可能停止。这需要 waitForNextEvent 方法有某种超时,这样它就不会无限期地阻塞。