喜欢:响应的顺序
Akka: The order of responses
我的演示应用程序很简单。这是一个演员:
class CounterActor extends Actor {
@volatile private[this] var counter = 0
def receive: PartialFunction[Any, Unit] = {
case Count(id) ⇒ sender ! self ? Increment(id)
case Increment(id) ⇒ sender ! {
counter += 1
println(s"req_id=$id, counter=$counter")
counter
}
}
}
主应用程序:
sealed trait ActorMessage
case class Count(id: Int = 0) extends ActorMessage
case class Increment(id: Int) extends ActorMessage
object CountingApp extends App {
// Get incremented counter
val future0 = counter ? Count(1)
val future1 = counter ? Count(2)
val future2 = counter ? Count(3)
val future3 = counter ? Count(4)
val future4 = counter ? Count(5)
// Handle response
handleResponse(future0)
handleResponse(future1)
handleResponse(future2)
handleResponse(future3)
handleResponse(future4)
// Bye!
exit()
}
我的经纪人:
def handleResponse(future: Future[Any]): Unit = {
future.onComplete {
case Success(f) => f.asInstanceOf[Future[Any]].onComplete {
case x => x match {
case Success(n) => println(s" -> $n")
case Failure(t) => println(s" -> ${t.getMessage}")
}
}
case Failure(t) => println(t.getMessage)
}
}
如果我 运行 应用我会看到下一个输出:
req_id=1, counter=1
req_id=2, counter=2
req_id=3, counter=3
req_id=4, counter=4
req_id=5, counter=5
-> 4
-> 1
-> 5
-> 3
-> 2
处理响应的顺序是随机的。这是正常行为吗?如果没有,如何订购?
PS
actor 中需要 volatile var 吗?
PS2
此外,我正在为handleResponse
寻找一些更方便的逻辑,因为这里的匹配非常模糊...
正常行为?
是的,这是绝对正常的行为。
您的 Actor 正在按照您发送它们的顺序接收 Count 增量,但 Futures 正在通过提交到底层线程池来完成。正是未来线程绑定的不确定顺序导致 println 执行乱序。
如何订购?
如果你想按顺序执行 Futures 那么这就是同步编程的同义词,即根本没有并发性。
我需要 volatile 吗?
Actor 的状态只能在 Actor 自身内部访问。这就是为什么 Actor 的用户永远不会得到一个实际的 Actor 对象,他们只会得到一个 ActorRef,例如val actorRef = actorSystem actorOf Props[Actor]
。这部分是为了确保 Actors 的用户永远无法更改 Actor 的状态,除非通过消息传递。来自 docs:
The good news is that Akka actors conceptually each have their own
light-weight thread, which is completely shielded from the rest of the
system. This means that instead of having to synchronize access using
locks you can just write your actor code without worrying about
concurrency at all.
因此,您不需要 volatile。
更方便的逻辑
为了更方便的逻辑我会推荐Agents
, which are a kind of typed Actor with a simpler message framework. From the docs:
import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent
val agent = Agent(5)
val result = agent()
val result = agent.get
agent send 7
agent send (_ + 1)
读取是同步但瞬时的。写入是异步的。这意味着任何时候你做一个阅读你不必担心 Futures 因为内部价值 returns 立即。但一定要阅读文档,因为您可以使用排队逻辑来玩更复杂的技巧。
您方法中的真正问题不是异步性质,而是过于复杂的逻辑。
尽管 Ramon 的回答很漂亮,我 +1d,是的,有办法确保 akka 某些部分的秩序。正如我们从 the doc 中读到的那样,每个发送者-接收者对 有 消息排序保证。
这意味着对于两个参与者的每个单向通道,都可以保证消息将按发送顺序传递。
但是您用来处理答案的 Future
任务完成顺序没有这样的保证。从 ask
向原始发件人发送 Future
作为消息是很奇怪的。
您可以做的事情:
将您的 Increment
重新定义为
case class Increment(id: Int, requester: ActorRef) extends ActorMessage
所以处理者可以知道原始请求者
修改CounterActor
的接收为
def receive: Receive = {
case Count(id) ⇒ self ! Increment(id, sender)
case Increment(id, snd) ⇒ snd ! {
counter += 1
println(s"req_id=$id, counter=$counter")
counter
}
}
将您的 handleResponse
简化为
def handleResponse(future: Future[Any]): Unit = {
future.onComplete {
case Success(n: Int) => println(s" -> $n")
case Failure(t) => println(t.getMessage)
}
}
现在您可能会看到消息以相同的顺序收到。
我说可能因为在Future.onComplete
中处理仍然发生所以我们需要另一个参与者来确保顺序。
让我们定义附加消息
case object StartCounting
演员本身:
class SenderActor extends Actor {
val counter = system.actorOf(Props[CounterActor])
def receive: Actor.Receive = {
case n: Int => println(s" -> $n")
case StartCounting =>
counter ! Count(1)
counter ! Count(2)
counter ! Count(3)
counter ! Count(4)
counter ! Count(5)
}
}
在您的 main
中,您现在可以只写
val sender = system.actorOf(Props[SenderActor])
sender ! StartCounting
并丢弃 handleResponse
方法。
现在您肯定会看到您的消息处理顺序正确。
我们已经实现了没有单个 ask
和 .
的整个逻辑
所以神奇的规则是:将处理响应留给参与者,仅通过 ask
从他们那里获得最终结果。
请注意还有 forward
method 但这会创建代理角色,因此消息排序将再次被破坏。
我的演示应用程序很简单。这是一个演员:
class CounterActor extends Actor {
@volatile private[this] var counter = 0
def receive: PartialFunction[Any, Unit] = {
case Count(id) ⇒ sender ! self ? Increment(id)
case Increment(id) ⇒ sender ! {
counter += 1
println(s"req_id=$id, counter=$counter")
counter
}
}
}
主应用程序:
sealed trait ActorMessage
case class Count(id: Int = 0) extends ActorMessage
case class Increment(id: Int) extends ActorMessage
object CountingApp extends App {
// Get incremented counter
val future0 = counter ? Count(1)
val future1 = counter ? Count(2)
val future2 = counter ? Count(3)
val future3 = counter ? Count(4)
val future4 = counter ? Count(5)
// Handle response
handleResponse(future0)
handleResponse(future1)
handleResponse(future2)
handleResponse(future3)
handleResponse(future4)
// Bye!
exit()
}
我的经纪人:
def handleResponse(future: Future[Any]): Unit = {
future.onComplete {
case Success(f) => f.asInstanceOf[Future[Any]].onComplete {
case x => x match {
case Success(n) => println(s" -> $n")
case Failure(t) => println(s" -> ${t.getMessage}")
}
}
case Failure(t) => println(t.getMessage)
}
}
如果我 运行 应用我会看到下一个输出:
req_id=1, counter=1
req_id=2, counter=2
req_id=3, counter=3
req_id=4, counter=4
req_id=5, counter=5
-> 4
-> 1
-> 5
-> 3
-> 2
处理响应的顺序是随机的。这是正常行为吗?如果没有,如何订购?
PS
actor 中需要 volatile var 吗?
PS2
此外,我正在为handleResponse
寻找一些更方便的逻辑,因为这里的匹配非常模糊...
正常行为?
是的,这是绝对正常的行为。
您的 Actor 正在按照您发送它们的顺序接收 Count 增量,但 Futures 正在通过提交到底层线程池来完成。正是未来线程绑定的不确定顺序导致 println 执行乱序。
如何订购?
如果你想按顺序执行 Futures 那么这就是同步编程的同义词,即根本没有并发性。
我需要 volatile 吗?
Actor 的状态只能在 Actor 自身内部访问。这就是为什么 Actor 的用户永远不会得到一个实际的 Actor 对象,他们只会得到一个 ActorRef,例如val actorRef = actorSystem actorOf Props[Actor]
。这部分是为了确保 Actors 的用户永远无法更改 Actor 的状态,除非通过消息传递。来自 docs:
The good news is that Akka actors conceptually each have their own light-weight thread, which is completely shielded from the rest of the system. This means that instead of having to synchronize access using locks you can just write your actor code without worrying about concurrency at all.
因此,您不需要 volatile。
更方便的逻辑
为了更方便的逻辑我会推荐Agents
, which are a kind of typed Actor with a simpler message framework. From the docs:
import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent
val agent = Agent(5)
val result = agent()
val result = agent.get
agent send 7
agent send (_ + 1)
读取是同步但瞬时的。写入是异步的。这意味着任何时候你做一个阅读你不必担心 Futures 因为内部价值 returns 立即。但一定要阅读文档,因为您可以使用排队逻辑来玩更复杂的技巧。
您方法中的真正问题不是异步性质,而是过于复杂的逻辑。
尽管 Ramon 的回答很漂亮,我 +1d,是的,有办法确保 akka 某些部分的秩序。正如我们从 the doc 中读到的那样,每个发送者-接收者对 有 消息排序保证。
这意味着对于两个参与者的每个单向通道,都可以保证消息将按发送顺序传递。
但是您用来处理答案的 Future
任务完成顺序没有这样的保证。从 ask
向原始发件人发送 Future
作为消息是很奇怪的。
您可以做的事情:
将您的 Increment
重新定义为
case class Increment(id: Int, requester: ActorRef) extends ActorMessage
所以处理者可以知道原始请求者
修改CounterActor
的接收为
def receive: Receive = {
case Count(id) ⇒ self ! Increment(id, sender)
case Increment(id, snd) ⇒ snd ! {
counter += 1
println(s"req_id=$id, counter=$counter")
counter
}
}
将您的 handleResponse
简化为
def handleResponse(future: Future[Any]): Unit = {
future.onComplete {
case Success(n: Int) => println(s" -> $n")
case Failure(t) => println(t.getMessage)
}
}
现在您可能会看到消息以相同的顺序收到。
我说可能因为在Future.onComplete
中处理仍然发生所以我们需要另一个参与者来确保顺序。
让我们定义附加消息
case object StartCounting
演员本身:
class SenderActor extends Actor {
val counter = system.actorOf(Props[CounterActor])
def receive: Actor.Receive = {
case n: Int => println(s" -> $n")
case StartCounting =>
counter ! Count(1)
counter ! Count(2)
counter ! Count(3)
counter ! Count(4)
counter ! Count(5)
}
}
在您的 main
中,您现在可以只写
val sender = system.actorOf(Props[SenderActor])
sender ! StartCounting
并丢弃 handleResponse
方法。
现在您肯定会看到您的消息处理顺序正确。
我们已经实现了没有单个 ask
和
所以神奇的规则是:将处理响应留给参与者,仅通过 ask
从他们那里获得最终结果。
请注意还有 forward
method 但这会创建代理角色,因此消息排序将再次被破坏。