如何用 Akka 演员解决这个问题?
How to solve this with Akka actors?
不知道如何命名此线程,但会尝试用几行来解释问题。
我有一个命令需要计算所需日期范围内的价格。要计算它,系统需要单独获取每天的价格(数据库、配置、缓存,无论从哪里获取)。
我的建议是有一个 PriceRangeActor,它将有一个 DailyPriceActor 池,并向它们发送像 CalculateDailyPrice 这样的命令。
但是如何 assemble PriceRanceActor 中的所有数据?
1。
有一些带有复杂键的大地图闻起来很臭。那么如何确定范围是否已完全计算?有更简单的方法吗?
2。
为每个命令创建新的 PriceRangeActor 并使用询问模式查询 DailyPriceActors 列表?
因为您没有使用任何消息 passing/queuing 我建议使用 Futures 而不是 Actors 作为您的并发抽象机制。 blog entry 提出了一个非常有说服力的论点,即 Actors 用于状态而 Futures 用于计算。
使用 Futures 或 Actor ?
(这是一个 Future),您可以使用 Future.sequence
将所有单独的查询 Futures 捆绑在一起成为一个 Future,该 Future 仅完成一次所有子 -查询完成。
使用期货(推荐)
import scala.concurrent.Future
object Foo extends App {
type Date = Int
type Prices = Seq[Float]
type PriceMap = Map[Date, Prices]
//expensive query function
def fetchPrices(date : Date) : Prices = ???
//the Dates to query Prices for
val datesToQuery : Seq[Date] = ???
import scala.concurrent.ExecutionContext.Implicits._
def concurrentQuery(date : Date) : Future[Prices] = Future {fetchPrices(date)}
//launches a Future per date query, D Dates => D Futures
//Future.sequence converts the D Futures into 1 Future
val dates2PricesFuture : Future[PriceMap] =
Future.sequence(datesToQuery map concurrentQuery)
.map(datesToQuery zip _)
.map(_.toMap)
dates2PricesFuture onSuccess { case priceMap : PriceMap =>
//process the price data which is now completely available
}
}//end object Foo
使用演员
import scala.concurrent.Future
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
object Foo extends App {
type Date = Int
type Prices = Seq[Float]
type PriceMap = Map[Date, Prices]
def fetchPrices(date : Date) : Prices = ???
val datesToQuery : Seq[Date] = ???
class QueryActor() extends Actor {
def receive = { case date : Date => sender ! fetchPrices(date) }
}
implicit val as = ActorSystem()
implicit val queryTimeout = Timeout(1000)
import as.dispatcher
def concurrentQuery(date : Date) : Future[Prices] =
ask(as actorOf Props[QueryActor],date).mapTo[Prices]
val dates2PricesFuture : Future[PriceMap] =
Future.sequence(datesToQuery map concurrentQuery)
.map(datesToQuery zip _)
.map(_.toMap)
dates2PricesFuture onSuccess ... //same as first example
}//end object Foo
不知道如何命名此线程,但会尝试用几行来解释问题。
我有一个命令需要计算所需日期范围内的价格。要计算它,系统需要单独获取每天的价格(数据库、配置、缓存,无论从哪里获取)。
我的建议是有一个 PriceRangeActor,它将有一个 DailyPriceActor 池,并向它们发送像 CalculateDailyPrice 这样的命令。
但是如何 assemble PriceRanceActor 中的所有数据?
1。 有一些带有复杂键的大地图闻起来很臭。那么如何确定范围是否已完全计算?有更简单的方法吗?
2。 为每个命令创建新的 PriceRangeActor 并使用询问模式查询 DailyPriceActors 列表?
因为您没有使用任何消息 passing/queuing 我建议使用 Futures 而不是 Actors 作为您的并发抽象机制。 blog entry 提出了一个非常有说服力的论点,即 Actors 用于状态而 Futures 用于计算。
使用 Futures 或 Actor ?
(这是一个 Future),您可以使用 Future.sequence
将所有单独的查询 Futures 捆绑在一起成为一个 Future,该 Future 仅完成一次所有子 -查询完成。
使用期货(推荐)
import scala.concurrent.Future
object Foo extends App {
type Date = Int
type Prices = Seq[Float]
type PriceMap = Map[Date, Prices]
//expensive query function
def fetchPrices(date : Date) : Prices = ???
//the Dates to query Prices for
val datesToQuery : Seq[Date] = ???
import scala.concurrent.ExecutionContext.Implicits._
def concurrentQuery(date : Date) : Future[Prices] = Future {fetchPrices(date)}
//launches a Future per date query, D Dates => D Futures
//Future.sequence converts the D Futures into 1 Future
val dates2PricesFuture : Future[PriceMap] =
Future.sequence(datesToQuery map concurrentQuery)
.map(datesToQuery zip _)
.map(_.toMap)
dates2PricesFuture onSuccess { case priceMap : PriceMap =>
//process the price data which is now completely available
}
}//end object Foo
使用演员
import scala.concurrent.Future
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
object Foo extends App {
type Date = Int
type Prices = Seq[Float]
type PriceMap = Map[Date, Prices]
def fetchPrices(date : Date) : Prices = ???
val datesToQuery : Seq[Date] = ???
class QueryActor() extends Actor {
def receive = { case date : Date => sender ! fetchPrices(date) }
}
implicit val as = ActorSystem()
implicit val queryTimeout = Timeout(1000)
import as.dispatcher
def concurrentQuery(date : Date) : Future[Prices] =
ask(as actorOf Props[QueryActor],date).mapTo[Prices]
val dates2PricesFuture : Future[PriceMap] =
Future.sequence(datesToQuery map concurrentQuery)
.map(datesToQuery zip _)
.map(_.toMap)
dates2PricesFuture onSuccess ... //same as first example
}//end object Foo