排序akka路由器消息
Sequencing akka router messages
使用 akka 路由器,我有两个不同的操作要执行。但是当我传递消息时,它们是重叠的。这是我的代码。
class Master extends Actor {
import context._
val numRoutees = 3
val router: ActorRef = actorOf (RoundRobinPool (numRoutees).props(Props[Worker]), "router")
// broadcasts GetString() and receives a random string from each routee
def stringMessages(replies: Set[String] = Set()): Receive = {
case GetString() =>
router ! Broadcast(GetString()) // g
case reply: String =>
val updatedReplies = replies + reply
if (updatedReplies.size == numRoutees) {
println("result = " + updatedReplies.mkString("[", ",", "]"))
}
become(stringMessages(updatedReplies))
case GetInteger() =>
become(intMessages())
// self ! createArray() // h // <- uncommenting this results in an infinte loop
case _ => println("stringMessages: no matches")
}
// broadcasts GetInteger and receives a random integer from each routee
def intMessages(ints: Set[Int] = Set()): Receive = {
case GetInteger() =>
router ! Broadcast(GetInteger()) // e
case n: Int =>
val updatedInts = ints + n
if (updatedInts.size == numRoutees) {
println("result = " + updatedInts.mkString("[", ",", "]"))
}
become(intMessages(updatedInts))
case GetString() =>
become(stringMessages())
self ! GetString() // f
case _ => println("intMessages: no matches")
}
override def receive: Receive =
{
case GetString() =>
become(stringMessages())
self ! GetString() // c
case GetInteger() =>
become(intMessages())
self ! GetInteger() // d
case _ => println("root doesn't match")
}
}
object MasterTest extends App {
val system = ActorSystem ("ActorSystem")
val actor = system.actorOf(Props[Master], "root")
actor ! GetInteger() // a
actor ! GetString() // b
}
对于一些调试语句,我理解执行顺序可能是 a -> b -> f -> g
。 (注意代码中注释的语句的 ID)。该代码没有按照我的预期执行。输出是
result = [a,b,c]
如何让它们按照a -> d-> e -> b -> f -> g
的顺序执行。如果我添加一个 Thread.sleep
like
actor ! GetInteger() // a
Thread.sleep(3000)
actor ! GetString() // b
我得到了预期的输出,即
result = [0,4,6] // random integers
result = [a,b,c] // random strings
如何让 actor 消息排队,使得前一个消息执行完后才执行新消息。什么是更好的方法来实现我在 become()
上所做的事情?如果我想在代码中添加更多状态(例如此处的 GetInteger
和 GetString
),代码会变得过于冗余,无法使用 become(newState)
将状态从一个更改为另一个。
此外,如果我取消对 h
的注释,代码将变成 a -> b -> f -> d -> h -> f -> h -> f -> ...
的无限循环。所以我明白这首先不是正确的实现。
一个想法是在单个 Receive
行为中对 String
回复和 Int
回复进行编码。例如:
case object GetInteger
case object GetString
// ...
def handleMessages(intReplies: Set[Int] = Set(), strReplies: Set[String] = Set()): Receive = {
case GetInteger =>
router ! Broadcast(GetInteger)
case GetString =>
router ! Broadcast(GetString)
case i: Int =>
val updatedInts = intReplies + i
if (updatedInts.size == numRoutees) {
println("result = " + updatedInts.mkString("[", ",", "]"))
}
become(handleMessages(updatedInts, strReplies))
case str: String =>
val updatedStrings = strReplies + str
if (updatedStrings.size == numRoutees) {
println("result = " + updatedStrings.mkString("[", ",", "]"))
}
become(handleMessages(intReplies, updatedStrings))
case x =>
println("Not an Int or String: " + x)
}
def receive = handleMessages
请注意,我将 GetInteger
和 GetString
更改为 case 对象而不是 case 类,因为它们没有参数。这样做可以让您在末尾去掉括号(即,您可以使用 GetInteger
而不是 GetInteger()
)。
此外,如果您担心顺序,请考虑使用有序集合,例如 scala.collection.immutable.Seq
而不是 Set
(无序)。
使用 akka 路由器,我有两个不同的操作要执行。但是当我传递消息时,它们是重叠的。这是我的代码。
class Master extends Actor {
import context._
val numRoutees = 3
val router: ActorRef = actorOf (RoundRobinPool (numRoutees).props(Props[Worker]), "router")
// broadcasts GetString() and receives a random string from each routee
def stringMessages(replies: Set[String] = Set()): Receive = {
case GetString() =>
router ! Broadcast(GetString()) // g
case reply: String =>
val updatedReplies = replies + reply
if (updatedReplies.size == numRoutees) {
println("result = " + updatedReplies.mkString("[", ",", "]"))
}
become(stringMessages(updatedReplies))
case GetInteger() =>
become(intMessages())
// self ! createArray() // h // <- uncommenting this results in an infinte loop
case _ => println("stringMessages: no matches")
}
// broadcasts GetInteger and receives a random integer from each routee
def intMessages(ints: Set[Int] = Set()): Receive = {
case GetInteger() =>
router ! Broadcast(GetInteger()) // e
case n: Int =>
val updatedInts = ints + n
if (updatedInts.size == numRoutees) {
println("result = " + updatedInts.mkString("[", ",", "]"))
}
become(intMessages(updatedInts))
case GetString() =>
become(stringMessages())
self ! GetString() // f
case _ => println("intMessages: no matches")
}
override def receive: Receive =
{
case GetString() =>
become(stringMessages())
self ! GetString() // c
case GetInteger() =>
become(intMessages())
self ! GetInteger() // d
case _ => println("root doesn't match")
}
}
object MasterTest extends App {
val system = ActorSystem ("ActorSystem")
val actor = system.actorOf(Props[Master], "root")
actor ! GetInteger() // a
actor ! GetString() // b
}
对于一些调试语句,我理解执行顺序可能是 a -> b -> f -> g
。 (注意代码中注释的语句的 ID)。该代码没有按照我的预期执行。输出是
result = [a,b,c]
如何让它们按照a -> d-> e -> b -> f -> g
的顺序执行。如果我添加一个 Thread.sleep
like
actor ! GetInteger() // a
Thread.sleep(3000)
actor ! GetString() // b
我得到了预期的输出,即
result = [0,4,6] // random integers
result = [a,b,c] // random strings
如何让 actor 消息排队,使得前一个消息执行完后才执行新消息。什么是更好的方法来实现我在 become()
上所做的事情?如果我想在代码中添加更多状态(例如此处的 GetInteger
和 GetString
),代码会变得过于冗余,无法使用 become(newState)
将状态从一个更改为另一个。
此外,如果我取消对 h
的注释,代码将变成 a -> b -> f -> d -> h -> f -> h -> f -> ...
的无限循环。所以我明白这首先不是正确的实现。
一个想法是在单个 Receive
行为中对 String
回复和 Int
回复进行编码。例如:
case object GetInteger
case object GetString
// ...
def handleMessages(intReplies: Set[Int] = Set(), strReplies: Set[String] = Set()): Receive = {
case GetInteger =>
router ! Broadcast(GetInteger)
case GetString =>
router ! Broadcast(GetString)
case i: Int =>
val updatedInts = intReplies + i
if (updatedInts.size == numRoutees) {
println("result = " + updatedInts.mkString("[", ",", "]"))
}
become(handleMessages(updatedInts, strReplies))
case str: String =>
val updatedStrings = strReplies + str
if (updatedStrings.size == numRoutees) {
println("result = " + updatedStrings.mkString("[", ",", "]"))
}
become(handleMessages(intReplies, updatedStrings))
case x =>
println("Not an Int or String: " + x)
}
def receive = handleMessages
请注意,我将 GetInteger
和 GetString
更改为 case 对象而不是 case 类,因为它们没有参数。这样做可以让您在末尾去掉括号(即,您可以使用 GetInteger
而不是 GetInteger()
)。
此外,如果您担心顺序,请考虑使用有序集合,例如 scala.collection.immutable.Seq
而不是 Set
(无序)。