Akka Ask 在合并排序实现中保持超时

Akka Ask keeps timing out in a merge sort implementation

我完全是 Akka 和 Scala 的初学者。作为我的第一个项目,我决定实现合并排序,而不是递归,我实例化执行拆分和合并的新参与者。似乎我的系统到达了合并排序树的叶子,甚至发生了一些合并,但随后它停止了,我得到了一个 AskTimeoutException。我在另一个与 Ask 相关的项目中遇到了类似的问题。有人能指出我正确的方向吗?

ParentMerger 接收实现:

def receive = {
    case ParentMerger.Begin => {
      implicit var timeout = Timeout(60.seconds)
      println("Parent sending off first halves")
      // Assumption: at the beginning the array size is 2 or greater
      var arrayFuture1 = mergers(0) ? ParentMerger.SendHalf(array.slice(0, array.length/2))
      var arrayFuture2 = mergers(1) ? ParentMerger.SendHalf(array.slice(array.length/2, array.length))

      arrayFuture1.onComplete {
        case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => {
          arrayFuture2.onComplete {
            case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => {
              print(merge(arr1, arr2).toString())
              println("Final merge done")
            }
          }
        }
      }
    }
  }

合并接收实现:

    def receive = {
    case ParentMerger.SendHalf(array: ArrayBuffer[Int]) if array.length == 1 => {
      println("Child received array of size 1")
      sender() ! Merger.Reply(array)
    }
    case ParentMerger.SendHalf(array: ArrayBuffer[Int]) if array.length >= 2 => {
      println("Child received an array of size >= 2")
      for(i <- 0 to 1) {
        mergers(i) = context.actorOf(Props[Merger])
      }

      implicit var timeout = Timeout(60.seconds)
      var arrayFuture1 = mergers(0) ? ParentMerger.SendHalf(array.slice(0, array.length/2))
      var arrayFuture2 = mergers(1) ? ParentMerger.SendHalf(array.slice(array.length/2, array.length))

      arrayFuture1.onComplete {
        case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => {
          arrayFuture2.onComplete {
            case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => {
              println("Child merge")
              sender() ! Merger.Reply(merge(arr1, arr2))
            }
          }
        }
      }
    }
  }

我得到的输出:

    Parent sending off first halves
Child received an array of size >= 2
Child received an array of size >= 2
Child received an array of size >= 2
Child received an array of size >= 2
Child received array of size 1
Child received array of size 1
Child received an array of size >= 2
Child received an array of size >= 2
Child received array of size 1
Child received an array of size >= 2
Child received array of size 1
Child received array of size 1
Child received array of size 1
Child received an array of size >= 2
Child received array of size 1
Child received array of size 1
Child merge
Child merge
Child received array of size 1
Child merge
Child merge
Child received array of size 1
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$a/$b/$b#-1137516511] to Actor[akka://Main/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$b/$a#2073409209] to Actor[akka://Main/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$a/$a#-1459967586] to Actor[akka://Main/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [02/14/2017 08:35:32.412] [Main-akka.actor.default-dispatcher-13] [akka://Main/deadLetters] Message [Merger$Reply] from Actor[akka://Main/user/app/$b/$b/$b#-2142577608] to Actor[akka://Main/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
scala.MatchError: Failure(akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://Main/user/app/$a#520493765]] after [60000 ms]. Sender[Actor[akka://Main/user/app#1966408365]] sent message of type "ParentMerger$SendHalf".) (of class scala.util.Failure)
    at ParentMerger$$anonfun$receive.$anonfun$applyOrElse(ParentMerger.scala:67)
    at ParentMerger$$anonfun$receive.$anonfun$applyOrElse$adapted(ParentMerger.scala:66)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

您回复错误的演员:

sender() ! Merger.Reply(merge(arr1, arr2))

当从内部调用时,对未来的响应可能不起作用。在 onComplete 块之外捕获 sender

  val originalSender = sender()
  arrayFuture1.onComplete {
    case Success(Merger.Reply(arr1: ArrayBuffer[Int])) => {
      arrayFuture2.onComplete {
        case Success(Merger.Reply(arr2: ArrayBuffer[Int])) => {
          println("Child merge")
          originalSender  ! Merger.Reply(merge(arr1, arr2))
        }
      }
    }
  }