如何从 Ask 消息中调用 context 成为 Future 之外的内容?
How do I call context become outside of a Future from Ask messages?
我有一个名为 buildingCoordinator
的 parent akka 演员,它创建了孩子的名字 elevator_X
。现在我只创建一个 elevator
。 buildingCoordinator
发送一系列消息并等待响应以移动 elevator
。顺序是这样的:发送 ? RequestElevatorState
-> 接收 ElevatorState
-> 发送 ? MoveRequest
-> 接收 MoveRequestSuccess
-> 改变状态。如您所见,我正在使用 ask
模式。移动成功后 buildingCoordinator
使用 context.become
.
更改其状态
我运行的问题是电梯在同一楼层接收MoveRequest(1,4)
两次,有时三次。当我打电话给 context.become
时,我会移开地板。但是我删除了最后一个map
。我认为这是因为我在未来使用 context.become
并且我应该在外面使用它。但是我在实现它时遇到了问题。
case class BuildingCoordinator(actorName: String,
numberOfFloors: Int,
numberOfElevators: Int,
elevatorControlSystem: ElevatorControlSystem)
extends Actor with ActorLogging {
import context.dispatcher
implicit val timeout = Timeout(4 seconds)
val elevators = createElevators(numberOfElevators)
override def receive: Receive = operational(Map[Int, Queue[Int]](), Map[Int, Queue[Int]]())
def operational(stopsRequests: Map[Int, Queue[Int]], pickUpRequests: Map[Int, Queue[Int]]): Receive = {
case msg@MoveElevator(elevatorId) =>
println(s"[BuildingCoordinator] received $msg")
val elevatorActor: ActorSelection = context.actorSelection(s"/user/$actorName/elevator_$elevatorId")
val newState = (elevatorActor ? RequestElevatorState(elevatorId))
.mapTo[ElevatorState]
.flatMap { state =>
val nextStop = elevatorControlSystem.findNextStop(stopsRequests.get(elevatorId).get, state.currentFloor, state.direction)
elevatorActor ? MoveRequest(elevatorId, nextStop)
}
.mapTo[MoveRequestSuccess]
.flatMap(moveRequestSuccess => elevatorActor ? MakeMove(elevatorId, moveRequestSuccess.targetFloor))
.mapTo[MakeMoveSuccess]
.map { makeMoveSuccess =>
println(s"[BuildingCoordinator] Elevator ${makeMoveSuccess.elevatorId} arrived at floor [${makeMoveSuccess.floor}]")
// removeStopRequest
val stopsRequestsElevator = stopsRequests.get(elevatorId).getOrElse(Queue[Int]())
val newStopsRequestsElevator = stopsRequestsElevator.filterNot(_ == makeMoveSuccess.floor)
val newStopsRequests = stopsRequests + (elevatorId -> newStopsRequestsElevator)
val pickUpRequestsElevator = pickUpRequests.get(elevatorId).getOrElse(Queue[Int]())
val newPickUpRequestsElevator = {
if (pickUpRequestsElevator.contains(makeMoveSuccess.floor)) {
pickUpRequestsElevator.filterNot(_ == makeMoveSuccess.floor)
} else {
pickUpRequestsElevator
}
}
val newPickUpRequests = pickUpRequests + (elevatorId -> newPickUpRequestsElevator)
// I THINK I SHOULD NOT CALL context.become HERE
// context.become(operational(newStopsRequests, newPickUpRequests))
val dropOffFloor = BuildingUtil.generateRandomFloor(numberOfFloors, makeMoveSuccess.floor, makeMoveSuccess.direction)
context.self ! DropOffRequest(makeMoveSuccess.elevatorId, dropOffFloor)
(newStopsRequests, newPickUpRequests)
}
// I MUST CALL context.become HERE, BUT I DONT KNOW HOW
// context.become(operational(newState.flatMap(state => (state._1, state._2))))
}
另一件可能令人讨厌的事情是 map
和 flatMap
的大链条。这是我的实现方式,但我认为它可能存在一种更好的方式。
你不能也不应该调用 context.become
或以任何方式在 Receive
方法之外和 Receive
方法之外更改 actor 状态调用线程(这是 Akka distpatcher 线程),例如在你的例子中。例如:
def receive: Receive = {
// This is a bug, because context is not and is not supposed to be thread safe.
case message: Message => Future(context.become(anotherReceive))
}
你应该做什么 - 在异步操作完成后将消息发送到 self
并更改接收后的状态。如果同时您不想处理传入的消息 - 您可以隐藏它们。详情见:https://doc.akka.io/docs/akka/current/typed/stash.html
高级示例,省略技术细节:
case OperationFinished(calculations: Map[Any, Any])
class AsyncActor extends Actor with Stash {
def operation: Future[Map[Any, Any]] = ...//some implementation of heavy async operation
def receiveStartAsync(calculations: Map[Any, Any]): Receive = {
case StartAsyncOperation =>
//Start async operation and inform yourself that it is finished
operation.map(OperationFinished.apply) pipeTo self
context.become(receiveWaitAsyncOperation)
}
def receiveWaitAsyncOperation: Receive = {
case OperationFinished =>
unstashAll()
context.become(receiveStartAsync)
case _ => stash()
}
}
我喜欢你@Ivan Kurchenko 的回复。
但是,根据:Akka Stash docs
When unstashing the buffered messages by calling unstashAll the messages will be processed sequentially in the order they were added and all are processed unless an exception is thrown. The actor is unresponsive to other new messages until unstashAll is completed. That is another reason for keeping the number of stashed messages low. Actors that hog the message processing thread for too long can result in starvation of other actors.
意味着在负载下,例如,unstashAll
操作将导致所有其他 Actor 处于饥饿状态。
根据同一文档:
That can be mitigated by using the StashBuffer.unstash with numberOfMessages parameter and then send a message to context.self before continuing unstashing more. That means that other new messages may arrive in-between and those must be stashed to keep the original order of messages. It becomes more complicated, so better keep the number of stashed messages low.
底线:您应该保持较低的隐藏消息数。它可能不适合加载操作。
我有一个名为 buildingCoordinator
的 parent akka 演员,它创建了孩子的名字 elevator_X
。现在我只创建一个 elevator
。 buildingCoordinator
发送一系列消息并等待响应以移动 elevator
。顺序是这样的:发送 ? RequestElevatorState
-> 接收 ElevatorState
-> 发送 ? MoveRequest
-> 接收 MoveRequestSuccess
-> 改变状态。如您所见,我正在使用 ask
模式。移动成功后 buildingCoordinator
使用 context.become
.
我运行的问题是电梯在同一楼层接收MoveRequest(1,4)
两次,有时三次。当我打电话给 context.become
时,我会移开地板。但是我删除了最后一个map
。我认为这是因为我在未来使用 context.become
并且我应该在外面使用它。但是我在实现它时遇到了问题。
case class BuildingCoordinator(actorName: String,
numberOfFloors: Int,
numberOfElevators: Int,
elevatorControlSystem: ElevatorControlSystem)
extends Actor with ActorLogging {
import context.dispatcher
implicit val timeout = Timeout(4 seconds)
val elevators = createElevators(numberOfElevators)
override def receive: Receive = operational(Map[Int, Queue[Int]](), Map[Int, Queue[Int]]())
def operational(stopsRequests: Map[Int, Queue[Int]], pickUpRequests: Map[Int, Queue[Int]]): Receive = {
case msg@MoveElevator(elevatorId) =>
println(s"[BuildingCoordinator] received $msg")
val elevatorActor: ActorSelection = context.actorSelection(s"/user/$actorName/elevator_$elevatorId")
val newState = (elevatorActor ? RequestElevatorState(elevatorId))
.mapTo[ElevatorState]
.flatMap { state =>
val nextStop = elevatorControlSystem.findNextStop(stopsRequests.get(elevatorId).get, state.currentFloor, state.direction)
elevatorActor ? MoveRequest(elevatorId, nextStop)
}
.mapTo[MoveRequestSuccess]
.flatMap(moveRequestSuccess => elevatorActor ? MakeMove(elevatorId, moveRequestSuccess.targetFloor))
.mapTo[MakeMoveSuccess]
.map { makeMoveSuccess =>
println(s"[BuildingCoordinator] Elevator ${makeMoveSuccess.elevatorId} arrived at floor [${makeMoveSuccess.floor}]")
// removeStopRequest
val stopsRequestsElevator = stopsRequests.get(elevatorId).getOrElse(Queue[Int]())
val newStopsRequestsElevator = stopsRequestsElevator.filterNot(_ == makeMoveSuccess.floor)
val newStopsRequests = stopsRequests + (elevatorId -> newStopsRequestsElevator)
val pickUpRequestsElevator = pickUpRequests.get(elevatorId).getOrElse(Queue[Int]())
val newPickUpRequestsElevator = {
if (pickUpRequestsElevator.contains(makeMoveSuccess.floor)) {
pickUpRequestsElevator.filterNot(_ == makeMoveSuccess.floor)
} else {
pickUpRequestsElevator
}
}
val newPickUpRequests = pickUpRequests + (elevatorId -> newPickUpRequestsElevator)
// I THINK I SHOULD NOT CALL context.become HERE
// context.become(operational(newStopsRequests, newPickUpRequests))
val dropOffFloor = BuildingUtil.generateRandomFloor(numberOfFloors, makeMoveSuccess.floor, makeMoveSuccess.direction)
context.self ! DropOffRequest(makeMoveSuccess.elevatorId, dropOffFloor)
(newStopsRequests, newPickUpRequests)
}
// I MUST CALL context.become HERE, BUT I DONT KNOW HOW
// context.become(operational(newState.flatMap(state => (state._1, state._2))))
}
另一件可能令人讨厌的事情是 map
和 flatMap
的大链条。这是我的实现方式,但我认为它可能存在一种更好的方式。
你不能也不应该调用 context.become
或以任何方式在 Receive
方法之外和 Receive
方法之外更改 actor 状态调用线程(这是 Akka distpatcher 线程),例如在你的例子中。例如:
def receive: Receive = {
// This is a bug, because context is not and is not supposed to be thread safe.
case message: Message => Future(context.become(anotherReceive))
}
你应该做什么 - 在异步操作完成后将消息发送到 self
并更改接收后的状态。如果同时您不想处理传入的消息 - 您可以隐藏它们。详情见:https://doc.akka.io/docs/akka/current/typed/stash.html
高级示例,省略技术细节:
case OperationFinished(calculations: Map[Any, Any])
class AsyncActor extends Actor with Stash {
def operation: Future[Map[Any, Any]] = ...//some implementation of heavy async operation
def receiveStartAsync(calculations: Map[Any, Any]): Receive = {
case StartAsyncOperation =>
//Start async operation and inform yourself that it is finished
operation.map(OperationFinished.apply) pipeTo self
context.become(receiveWaitAsyncOperation)
}
def receiveWaitAsyncOperation: Receive = {
case OperationFinished =>
unstashAll()
context.become(receiveStartAsync)
case _ => stash()
}
}
我喜欢你@Ivan Kurchenko 的回复。 但是,根据:Akka Stash docs
When unstashing the buffered messages by calling unstashAll the messages will be processed sequentially in the order they were added and all are processed unless an exception is thrown. The actor is unresponsive to other new messages until unstashAll is completed. That is another reason for keeping the number of stashed messages low. Actors that hog the message processing thread for too long can result in starvation of other actors.
意味着在负载下,例如,unstashAll
操作将导致所有其他 Actor 处于饥饿状态。
根据同一文档:
That can be mitigated by using the StashBuffer.unstash with numberOfMessages parameter and then send a message to context.self before continuing unstashing more. That means that other new messages may arrive in-between and those must be stashed to keep the original order of messages. It becomes more complicated, so better keep the number of stashed messages low.
底线:您应该保持较低的隐藏消息数。它可能不适合加载操作。