如何用 Akka 演员解决这个问题?

How to solve this with Akka actors?

不知道如何命名此线程,但会尝试用几行来解释问题。

我有一个命令需要计算所需日期范围内的价格。要计算它,系统需要单独获取每天的价格(数据库、配置、缓存,无论从哪里获取)。

我的建议是有一个 PriceRangeActor,它将有一个 DailyPriceActor 池,并向它们发送像 CalculateDailyPrice 这样的命令。

但是如何 assemble PriceRanceActor 中的所有数据?

1。 有一些带有复杂键的大地图闻起来很臭。那么如何确定范围是否已完全计算?有更简单的方法吗?

2。 为每个命令创建新的 PriceRangeActor 并使用询问模式查询 DailyPriceActors 列表?

因为您没有使用任何消息 passing/queuing 我建议使用 Futures 而不是 Actors 作为您的并发抽象机制。 blog entry 提出了一个非常有说服力的论点,即 Actors 用于状态而 Futures 用于计算。

使用 Futures 或 Actor ?(这是一个 Future),您可以使用 Future.sequence 将所有单独的查询 Futures 捆绑在一起成为一个 Future,该 Future 仅完成一次所有子 -查询完成。

使用期货(推荐)

import scala.concurrent.Future

object Foo extends App {

  type Date = Int
  type Prices = Seq[Float]
  type PriceMap = Map[Date, Prices]

  //expensive query function
  def fetchPrices(date : Date) : Prices = ???

  //the Dates to query Prices for
  val datesToQuery : Seq[Date] = ???

  import scala.concurrent.ExecutionContext.Implicits._

  def concurrentQuery(date : Date) : Future[Prices] = Future {fetchPrices(date)}      

  //launches a Future per date query, D Dates => D Futures
  //Future.sequence converts the D Futures into 1 Future
  val dates2PricesFuture : Future[PriceMap] = 
    Future.sequence(datesToQuery map concurrentQuery)
          .map(datesToQuery zip _)  
          .map(_.toMap)      

  dates2PricesFuture onSuccess { case priceMap : PriceMap =>
    //process the price data which is now completely available
  }

}//end object Foo

使用演员

import scala.concurrent.Future
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout

object Foo extends App {          
  type Date = Int
  type Prices = Seq[Float]
  type PriceMap = Map[Date, Prices]

  def fetchPrices(date : Date) : Prices = ???

  val datesToQuery : Seq[Date] = ???

  class QueryActor() extends Actor {
    def receive = { case date : Date => sender ! fetchPrices(date) }
  }

  implicit val as = ActorSystem()
  implicit val queryTimeout = Timeout(1000)

  import as.dispatcher

  def concurrentQuery(date : Date) : Future[Prices] =    
    ask(as actorOf Props[QueryActor],date).mapTo[Prices]

  val dates2PricesFuture : Future[PriceMap] = 
        Future.sequence(datesToQuery map concurrentQuery)
              .map(datesToQuery zip _)  
              .map(_.toMap)

  dates2PricesFuture onSuccess ... //same as first example

}//end object Foo