Akka 问模式:如何使请求和回复上下文感知?
Akka ask pattern: how to make request and replies context-aware?
对于一项大学作业,我必须在 Akka 中实现 Raft 协议的模拟(我正在使用 Akka 类型,使用 Behaviors)。
在 Raft 协议中,参与者之间的交互在请求和响应之间有一个 1:1 映射;必须及时作出答复。
因此,如 Request-Response with ask between two actors 示例中的文档所示,使用 ask
模式是有意义的。
在我的实现中,请求和响应必须是上下文感知的:这意味着,当执行查询的参与者收到响应时,它必须知道响应是针对什么查询的。文档中的示例建议在消息中包含查询 ID。
我需要解决的问题可以用下面的例子来描述:
- Actor A 向 actor B 发送 ID=1 的查询(它在消息中包含查询 ID)。
- B没有及时回复(可能是网络慢,也可能是B本身慢),A重新向B发出ID=2的query
- Actor B 收到 ID=1 的查询,并回复给 actor A(在消息中包含查询 ID)。
- Actor A 收到 B 的 ID=1 的回复。 A 知道它发送的最后一个查询有 ID=2,因此不能处理回复而是等待 ID=2 的回复。
我认为,为了“过滤”没有正确查询 ID 的回复,我可以在 actor A 中放置一个 BehaviorInterceptor 来检查回复中的 ID 是否与预期的查询 ID 匹配。
总结:
- Actor A 在 hashmap 中写入预期来自 actor B 的下一个回复的查询 ID,
- 拦截器使用此哈希图检查回复中的 ID。
这样的设计好吗?
另外,我不明白ask
是否阻塞。
理想情况下,我想以非阻塞方式使用 ask
:actor A ask
s actor B,并且,在等待 B 的回复时,A 可以进行其他操作。
在等待 B 的回复时,Actor A 也可以根据需要更改其行为(也是不处理 B 回复的行为)。
感谢您的任何见解!
两个演员之间的提问(使用 ActorContext
)是 non-blocking。
由于对给定目标的请求的高水位线是参与者协议状态的重要组成部分,我将把它存储在请求参与者的状态中(例如在 Scala 中 Map[ActorRef[Request], Int]
)。适应的响应包含目标和它响应的 id(您定义在执行请求时如何合并);当收到适应的响应时,第一件事是将响应中的 id 与目标的高水位线进行比较。
在 Scala 中,例如:
sealed trait RequestA
case class QueryB(target: ActorRef[RequestB]) extends RequestA
case class ResponseFromB(target: ActorRef[RequestB], id: Int, resp: ResponseB) extends RequestA
case class BTimedOut(target: ActorRef[RequestB], id: Int) extends RequestA
sealed trait RequestB
def buildRequestB(id: Int)(replyTo: ResponseB): RequestB = ???
sealed trait ResponseB
def aBehavior(highWater: Map[ActorRef[RequestB], Int]): Behavior[RequestA] =
Behaviors.receive { (context, msg) =>
case QueryB(target) =>
implicit val timeout: Timeout = 10.seconds
val nextHighwater = highWater.get(target).map(_ + 1).getOrElse(0)
// request is sent and received "in the background"
context.ask(target, buildRequestB(nextHighwater)) {
case Success(resp) => ResponseFromB(target, nextHighwater, resp)
case Failure(_) => BTimedOut(target, nextHighwater)
}
aBehavior(highWater + (target -> nextHighwater))
case ResponseFromB(target, id, resp) =>
if (highWater.get(target).contains(id)) {
context.log.info("Accepting response: {}", resp)
Behaviors.same
} else {
context.log.info("Ignoring response: {}", resp)
Behaviors.same
}
case BTimedOut(target, id) =>
context.log.warning("Ask of {} (sequence ID {}) timed out", target, id)
Behaviors.same
}
对于一项大学作业,我必须在 Akka 中实现 Raft 协议的模拟(我正在使用 Akka 类型,使用 Behaviors)。
在 Raft 协议中,参与者之间的交互在请求和响应之间有一个 1:1 映射;必须及时作出答复。
因此,如 Request-Response with ask between two actors 示例中的文档所示,使用 ask
模式是有意义的。
在我的实现中,请求和响应必须是上下文感知的:这意味着,当执行查询的参与者收到响应时,它必须知道响应是针对什么查询的。文档中的示例建议在消息中包含查询 ID。
我需要解决的问题可以用下面的例子来描述:
- Actor A 向 actor B 发送 ID=1 的查询(它在消息中包含查询 ID)。
- B没有及时回复(可能是网络慢,也可能是B本身慢),A重新向B发出ID=2的query
- Actor B 收到 ID=1 的查询,并回复给 actor A(在消息中包含查询 ID)。
- Actor A 收到 B 的 ID=1 的回复。 A 知道它发送的最后一个查询有 ID=2,因此不能处理回复而是等待 ID=2 的回复。
我认为,为了“过滤”没有正确查询 ID 的回复,我可以在 actor A 中放置一个 BehaviorInterceptor 来检查回复中的 ID 是否与预期的查询 ID 匹配。
总结:
- Actor A 在 hashmap 中写入预期来自 actor B 的下一个回复的查询 ID,
- 拦截器使用此哈希图检查回复中的 ID。 这样的设计好吗?
另外,我不明白ask
是否阻塞。
理想情况下,我想以非阻塞方式使用 ask
:actor A ask
s actor B,并且,在等待 B 的回复时,A 可以进行其他操作。
在等待 B 的回复时,Actor A 也可以根据需要更改其行为(也是不处理 B 回复的行为)。
感谢您的任何见解!
两个演员之间的提问(使用 ActorContext
)是 non-blocking。
由于对给定目标的请求的高水位线是参与者协议状态的重要组成部分,我将把它存储在请求参与者的状态中(例如在 Scala 中 Map[ActorRef[Request], Int]
)。适应的响应包含目标和它响应的 id(您定义在执行请求时如何合并);当收到适应的响应时,第一件事是将响应中的 id 与目标的高水位线进行比较。
在 Scala 中,例如:
sealed trait RequestA
case class QueryB(target: ActorRef[RequestB]) extends RequestA
case class ResponseFromB(target: ActorRef[RequestB], id: Int, resp: ResponseB) extends RequestA
case class BTimedOut(target: ActorRef[RequestB], id: Int) extends RequestA
sealed trait RequestB
def buildRequestB(id: Int)(replyTo: ResponseB): RequestB = ???
sealed trait ResponseB
def aBehavior(highWater: Map[ActorRef[RequestB], Int]): Behavior[RequestA] =
Behaviors.receive { (context, msg) =>
case QueryB(target) =>
implicit val timeout: Timeout = 10.seconds
val nextHighwater = highWater.get(target).map(_ + 1).getOrElse(0)
// request is sent and received "in the background"
context.ask(target, buildRequestB(nextHighwater)) {
case Success(resp) => ResponseFromB(target, nextHighwater, resp)
case Failure(_) => BTimedOut(target, nextHighwater)
}
aBehavior(highWater + (target -> nextHighwater))
case ResponseFromB(target, id, resp) =>
if (highWater.get(target).contains(id)) {
context.log.info("Accepting response: {}", resp)
Behaviors.same
} else {
context.log.info("Ignoring response: {}", resp)
Behaviors.same
}
case BTimedOut(target, id) =>
context.log.warning("Ask of {} (sequence ID {}) timed out", target, id)
Behaviors.same
}