如何从 Ask 消息中调用 context 成为 Future 之外的内容?

How do I call context become outside of a Future from Ask messages?

我有一个名为 buildingCoordinator 的 parent akka 演员,它创建了孩子的名字 elevator_X。现在我只创建一个 elevatorbuildingCoordinator 发送一系列消息并等待响应以移动 elevator。顺序是这样的:发送 ? RequestElevatorState -> 接收 ElevatorState -> 发送 ? MoveRequest -> 接收 MoveRequestSuccess -> 改变状态。如您所见,我正在使用 ask 模式。移动成功后 buildingCoordinator 使用 context.become.

更改其状态

我运行的问题是电梯在同一楼层接收MoveRequest(1,4)两次,有时三次。当我打电话给 context.become 时,我会移开地板。但是我删除了最后一个map。我认为这是因为我在未来使用 context.become 并且我应该在外面使用它。但是我在实现它时遇到了问题。

case class BuildingCoordinator(actorName: String,
                               numberOfFloors: Int,
                               numberOfElevators: Int,
                               elevatorControlSystem: ElevatorControlSystem)
  extends Actor with ActorLogging {
  import context.dispatcher
  implicit val timeout = Timeout(4 seconds)
  val elevators = createElevators(numberOfElevators)

  override def receive: Receive = operational(Map[Int, Queue[Int]](), Map[Int, Queue[Int]]())

  def operational(stopsRequests: Map[Int, Queue[Int]], pickUpRequests: Map[Int, Queue[Int]]): Receive = {
    case msg@MoveElevator(elevatorId) =>
      println(s"[BuildingCoordinator] received $msg")
      val elevatorActor: ActorSelection = context.actorSelection(s"/user/$actorName/elevator_$elevatorId")
      val newState = (elevatorActor ? RequestElevatorState(elevatorId))
        .mapTo[ElevatorState]
        .flatMap { state =>
          val nextStop = elevatorControlSystem.findNextStop(stopsRequests.get(elevatorId).get, state.currentFloor, state.direction)
          elevatorActor ? MoveRequest(elevatorId, nextStop)
        }
        .mapTo[MoveRequestSuccess]
        .flatMap(moveRequestSuccess => elevatorActor ? MakeMove(elevatorId, moveRequestSuccess.targetFloor))
        .mapTo[MakeMoveSuccess]
        .map { makeMoveSuccess =>
          println(s"[BuildingCoordinator] Elevator ${makeMoveSuccess.elevatorId} arrived at floor [${makeMoveSuccess.floor}]")
          // removeStopRequest
          val stopsRequestsElevator = stopsRequests.get(elevatorId).getOrElse(Queue[Int]())
          val newStopsRequestsElevator = stopsRequestsElevator.filterNot(_ == makeMoveSuccess.floor)
          val newStopsRequests = stopsRequests + (elevatorId -> newStopsRequestsElevator)

          val pickUpRequestsElevator = pickUpRequests.get(elevatorId).getOrElse(Queue[Int]())
          val newPickUpRequestsElevator = {
            if (pickUpRequestsElevator.contains(makeMoveSuccess.floor)) {
              pickUpRequestsElevator.filterNot(_ == makeMoveSuccess.floor)
            } else {
              pickUpRequestsElevator
            }
          }
          val newPickUpRequests = pickUpRequests + (elevatorId -> newPickUpRequestsElevator)
          // I THINK I SHOULD NOT CALL context.become HERE
          // context.become(operational(newStopsRequests, newPickUpRequests))

          val dropOffFloor = BuildingUtil.generateRandomFloor(numberOfFloors, makeMoveSuccess.floor, makeMoveSuccess.direction)
          context.self ! DropOffRequest(makeMoveSuccess.elevatorId, dropOffFloor)

          (newStopsRequests, newPickUpRequests)
        }
      // I MUST CALL context.become HERE, BUT I DONT KNOW HOW
      // context.become(operational(newState.flatMap(state => (state._1, state._2))))
  }

另一件可能令人讨厌的事情是 mapflatMap 的大链条。这是我的实现方式,但我认为它可能存在一种更好的方式。

你不能也不应该调用 context.become 或以任何方式在 Receive 方法之外和 Receive 方法之外更改 actor 状态调用线程(这是 Akka distpatcher 线程),例如在你的例子中。例如:

def receive: Receive  = {
   // This is a bug, because context is not and is not supposed to be thread safe.
   case message: Message => Future(context.become(anotherReceive))
}

你应该做什么 - 在异步操作完成后将消息发送到 self 并更改接收后的状态。如果同时您不想处理传入的消息 - 您可以隐藏它们。详情见:https://doc.akka.io/docs/akka/current/typed/stash.html

高级示例,省略技术细节:

case OperationFinished(calculations: Map[Any, Any])

class AsyncActor extends Actor with Stash {

 def operation: Future[Map[Any, Any]] = ...//some implementation of heavy async operation

 def receiveStartAsync(calculations: Map[Any, Any]): Receive =  {
   case StartAsyncOperation => 
     //Start async operation and inform yourself that it is finished
     operation.map(OperationFinished.apply) pipeTo self
     context.become(receiveWaitAsyncOperation)
 }

 def receiveWaitAsyncOperation: Receive = {
   case OperationFinished => 
     unstashAll()
     context.become(receiveStartAsync)

   case _ => stash()
 }
}

我喜欢你@Ivan Kurchenko 的回复。 但是,根据:Akka Stash docs

When unstashing the buffered messages by calling unstashAll the messages will be processed sequentially in the order they were added and all are processed unless an exception is thrown. The actor is unresponsive to other new messages until unstashAll is completed. That is another reason for keeping the number of stashed messages low. Actors that hog the message processing thread for too long can result in starvation of other actors.

意味着在负载下,例如,unstashAll 操作将导致所有其他 Actor 处于饥饿状态。

根据同一文档:

That can be mitigated by using the StashBuffer.unstash with numberOfMessages parameter and then send a message to context.self before continuing unstashing more. That means that other new messages may arrive in-between and those must be stashed to keep the original order of messages. It becomes more complicated, so better keep the number of stashed messages low.

底线:您应该保持较低的隐藏消息数。它可能不适合加载操作。