排序akka路由器消息

Sequencing akka router messages

使用 akka 路由器,我有两个不同的操作要执行。但是当我传递消息时,它们是重叠的。这是我的代码。

class Master extends Actor {

    import context._

    val numRoutees = 3
    val router: ActorRef = actorOf (RoundRobinPool (numRoutees).props(Props[Worker]), "router")

    // broadcasts GetString() and receives a random string from each routee
    def stringMessages(replies: Set[String] = Set()): Receive = {
        case GetString() =>
            router ! Broadcast(GetString())     // g
        case reply: String =>
            val updatedReplies = replies + reply
            if (updatedReplies.size == numRoutees) {
                println("result = " + updatedReplies.mkString("[", ",", "]"))
            }
            become(stringMessages(updatedReplies))

        case GetInteger() =>
            become(intMessages())
            //    self ! createArray()  // h      // <- uncommenting this results in an infinte loop

        case _ => println("stringMessages: no matches")
    }

    // broadcasts GetInteger and receives a random integer from each routee
    def intMessages(ints: Set[Int] = Set()): Receive = {
        case GetInteger() =>
            router ! Broadcast(GetInteger())    // e
        case n: Int =>
            val updatedInts = ints + n
            if (updatedInts.size == numRoutees) {
                println("result = " + updatedInts.mkString("[", ",", "]"))
            }
            become(intMessages(updatedInts))

        case GetString() =>
            become(stringMessages())
            self ! GetString()                  // f

        case _ => println("intMessages: no matches")
    }

    override def receive: Receive =
    {
        case GetString() =>
            become(stringMessages())
            self ! GetString()      // c
        case GetInteger() =>
            become(intMessages())
            self ! GetInteger()     // d
        case _ => println("root doesn't match")
    }
}


object MasterTest extends App {
    val system = ActorSystem ("ActorSystem")
    val actor = system.actorOf(Props[Master], "root")

    actor ! GetInteger()        // a
    actor ! GetString()         // b
}

对于一些调试语句,我理解执行顺序可能是 a -> b -> f -> g。 (注意代码中注释的语句的 ID)。该代码没有按照我的预期执行。输出是

result = [a,b,c]

如何让它们按照a -> d-> e -> b -> f -> g的顺序执行。如果我添加一个 Thread.sleep like

actor ! GetInteger()        // a
Thread.sleep(3000)
actor ! GetString()         // b

我得到了预期的输出,即

result = [0,4,6]    // random integers
result = [a,b,c]    // random strings

如何让 actor 消息排队,使得前一个消息执行完后才执行新消息。什么是更好的方法来实现我在 become() 上所做的事情?如果我想在代码中添加更多状态(例如此处的 GetIntegerGetString),代码会变得过于冗余,无法使用 become(newState) 将状态从一个更改为另一个。

此外,如果我取消对 h 的注释,代码将变成 a -> b -> f -> d -> h -> f -> h -> f -> ... 的无限循环。所以我明白这首先不是正确的实现。

一个想法是在单个 Receive 行为中对 String 回复和 Int 回复进行编码。例如:

case object GetInteger
case object GetString

// ...

def handleMessages(intReplies: Set[Int] = Set(), strReplies: Set[String] = Set()): Receive = {
  case GetInteger =>
    router ! Broadcast(GetInteger)
  case GetString =>
    router ! Broadcast(GetString)
  case i: Int =>
    val updatedInts = intReplies + i
    if (updatedInts.size == numRoutees) {
      println("result = " + updatedInts.mkString("[", ",", "]"))
    }
    become(handleMessages(updatedInts, strReplies))
  case str: String =>
    val updatedStrings = strReplies + str
    if (updatedStrings.size == numRoutees) {
      println("result = " + updatedStrings.mkString("[", ",", "]"))
    }
    become(handleMessages(intReplies, updatedStrings))
  case x =>
    println("Not an Int or String: " + x)
}

def receive = handleMessages

请注意,我将 GetIntegerGetString 更改为 case 对象而不是 case 类,因为它们没有参数。这样做可以让您在末尾去掉括号(即,您可以使用 GetInteger 而不是 GetInteger())。

此外,如果您担心顺序,请考虑使用有序集合,例如 scala.collection.immutable.Seq 而不是 Set(无序)。