Scala Akka Consumer/Producer:Return 值
Scala Akka Consumer/Producer: Return Value
问题陈述
假设我有一个包含逐行处理的句子的文件。就我而言,我需要从这些行中提取命名实体(人员、组织等)。不幸的是,标注器非常慢。因此,我决定将计算并行化,这样就可以独立处理各行,并将结果收集在一个中央位置。
当前方法
我目前的方法包括使用单个生产者多个消费者概念。但是,我对 Akka 比较陌生,但我认为我的问题描述很适合它的功能。让我给你看一些代码:
制作人
Producer
逐行读取文件并发送给Consumer
。如果达到总行限制,它将结果传播回 WordCount
.
class Producer(consumers: ActorRef) extends Actor with ActorLogging {
var master: Option[ActorRef] = None
var result = immutable.List[String]()
var totalLines = 0
var linesProcessed = 0
override def receive = {
case StartProcessing() => {
master = Some(sender)
Source.fromFile("sent.txt", "utf-8").getLines.foreach { line =>
consumers ! Sentence(line)
totalLines += 1
}
context.stop(self)
}
case SentenceProcessed(list) => {
linesProcessed += 1
result :::= list
//If we are done, we can propagate the result to the creator
if (linesProcessed == totalLines) {
master.map(_ ! result)
}
}
case _ => log.error("message not recognized")
}
}
消费者
class Consumer extends Actor with ActorLogging {
def tokenize(line: String): Seq[String] = {
line.split(" ").map(_.toLowerCase)
}
override def receive = {
case Sentence(sent) => {
//Assume: This is representative for the extensive computation method
val tokens = tokenize(sent)
sender() ! SentenceProcessed(tokens.toList)
}
case _ => log.error("message not recognized")
}
}
字数统计(大师)
class WordCount extends Actor {
val consumers = context.actorOf(Props[Consumer].
withRouter(FromConfig()).
withDispatcher("consumer-dispatcher"), "consumers")
val producer = context.actorOf(Props(new Producer(consumers)), "producer")
context.watch(consumers)
context.watch(producer)
def receive = {
case Terminated(`producer`) => consumers ! Broadcast(PoisonPill)
case Terminated(`consumers`) => context.system.shutdown
}
}
object WordCount {
def getActor() = new WordCount
def getConfig(routerType: String, dispatcherType: String)(numConsumers: Int) = s"""
akka.actor.deployment {
/WordCount/consumers {
router = $routerType
nr-of-instances = $numConsumers
dispatcher = consumer-dispatcher
}
}
consumer-dispatcher {
type = $dispatcherType
executor = "fork-join-executor"
}"""
}
WordCount
演员负责创建其他演员。 Consumer
完成后,Producer
会发送一条包含所有令牌的消息。但是,如何再次传播消息并接受并等待它呢?第三个 WordCount
演员的架构可能是错误的。
主要程序
case class Run(name: String, actor: () => Actor, config: (Int) => String)
object Main extends App {
val run = Run("push_implementation", WordCount.getActor _, WordCount.getConfig("balancing-pool", "Dispatcher") _)
def execute(run: Run, numConsumers: Int) = {
val config = ConfigFactory.parseString(run.config(numConsumers))
val system = ActorSystem("Counting", ConfigFactory.load(config))
val startTime = System.currentTimeMillis
system.actorOf(Props(run.actor()), "WordCount")
/*
How to get the result here?!
*/
system.awaitTermination
System.currentTimeMillis - startTime
}
execute(run, 4)
}
问题
如您所见,实际问题是将结果传播回 Main
例程。你能告诉我如何以正确的方式做到这一点吗?问题还在于如何等到消费者吃完了才出结果?我简要浏览了 Akka Future
文档部分,但整个系统对初学者来说有点不知所措。 var future = message ? actor
之类的东西似乎很合适。不确定,如何做到这一点。此外,使用 WordCount
actor 会导致额外的复杂性。也许可以想出一个不需要这个演员的解决方案?
考虑使用 Akka Aggregator Pattern。这会处理低级基元(观察演员、毒丸等)。您可以专注于管理状态。
您拨打 system.actorOf()
returns 和 ActorRef
,但您没有使用它。你应该向那个演员询问结果。像这样:
implicit val timeout = Timeout(5 seconds)
val wCount = system.actorOf(Props(run.actor()), "WordCount")
val answer = Await.result(wCount ? "sent.txt", timeout.duration)
这意味着您的 WordCount
class 需要一个 receive
方法来接受 String
消息。那段代码应该聚合结果并告诉 sender()
,像这样:
class WordCount extends Actor {
def receive: Receive = {
case filename: String =>
// do all of your code here, using filename
sender() ! results
}
}
此外,您可以应用一些技术来处理 Futures.
,而不是阻塞上面 Await
的结果
问题陈述
假设我有一个包含逐行处理的句子的文件。就我而言,我需要从这些行中提取命名实体(人员、组织等)。不幸的是,标注器非常慢。因此,我决定将计算并行化,这样就可以独立处理各行,并将结果收集在一个中央位置。
当前方法
我目前的方法包括使用单个生产者多个消费者概念。但是,我对 Akka 比较陌生,但我认为我的问题描述很适合它的功能。让我给你看一些代码:
制作人
Producer
逐行读取文件并发送给Consumer
。如果达到总行限制,它将结果传播回 WordCount
.
class Producer(consumers: ActorRef) extends Actor with ActorLogging {
var master: Option[ActorRef] = None
var result = immutable.List[String]()
var totalLines = 0
var linesProcessed = 0
override def receive = {
case StartProcessing() => {
master = Some(sender)
Source.fromFile("sent.txt", "utf-8").getLines.foreach { line =>
consumers ! Sentence(line)
totalLines += 1
}
context.stop(self)
}
case SentenceProcessed(list) => {
linesProcessed += 1
result :::= list
//If we are done, we can propagate the result to the creator
if (linesProcessed == totalLines) {
master.map(_ ! result)
}
}
case _ => log.error("message not recognized")
}
}
消费者
class Consumer extends Actor with ActorLogging {
def tokenize(line: String): Seq[String] = {
line.split(" ").map(_.toLowerCase)
}
override def receive = {
case Sentence(sent) => {
//Assume: This is representative for the extensive computation method
val tokens = tokenize(sent)
sender() ! SentenceProcessed(tokens.toList)
}
case _ => log.error("message not recognized")
}
}
字数统计(大师)
class WordCount extends Actor {
val consumers = context.actorOf(Props[Consumer].
withRouter(FromConfig()).
withDispatcher("consumer-dispatcher"), "consumers")
val producer = context.actorOf(Props(new Producer(consumers)), "producer")
context.watch(consumers)
context.watch(producer)
def receive = {
case Terminated(`producer`) => consumers ! Broadcast(PoisonPill)
case Terminated(`consumers`) => context.system.shutdown
}
}
object WordCount {
def getActor() = new WordCount
def getConfig(routerType: String, dispatcherType: String)(numConsumers: Int) = s"""
akka.actor.deployment {
/WordCount/consumers {
router = $routerType
nr-of-instances = $numConsumers
dispatcher = consumer-dispatcher
}
}
consumer-dispatcher {
type = $dispatcherType
executor = "fork-join-executor"
}"""
}
WordCount
演员负责创建其他演员。 Consumer
完成后,Producer
会发送一条包含所有令牌的消息。但是,如何再次传播消息并接受并等待它呢?第三个 WordCount
演员的架构可能是错误的。
主要程序
case class Run(name: String, actor: () => Actor, config: (Int) => String)
object Main extends App {
val run = Run("push_implementation", WordCount.getActor _, WordCount.getConfig("balancing-pool", "Dispatcher") _)
def execute(run: Run, numConsumers: Int) = {
val config = ConfigFactory.parseString(run.config(numConsumers))
val system = ActorSystem("Counting", ConfigFactory.load(config))
val startTime = System.currentTimeMillis
system.actorOf(Props(run.actor()), "WordCount")
/*
How to get the result here?!
*/
system.awaitTermination
System.currentTimeMillis - startTime
}
execute(run, 4)
}
问题
如您所见,实际问题是将结果传播回 Main
例程。你能告诉我如何以正确的方式做到这一点吗?问题还在于如何等到消费者吃完了才出结果?我简要浏览了 Akka Future
文档部分,但整个系统对初学者来说有点不知所措。 var future = message ? actor
之类的东西似乎很合适。不确定,如何做到这一点。此外,使用 WordCount
actor 会导致额外的复杂性。也许可以想出一个不需要这个演员的解决方案?
考虑使用 Akka Aggregator Pattern。这会处理低级基元(观察演员、毒丸等)。您可以专注于管理状态。
您拨打 system.actorOf()
returns 和 ActorRef
,但您没有使用它。你应该向那个演员询问结果。像这样:
implicit val timeout = Timeout(5 seconds)
val wCount = system.actorOf(Props(run.actor()), "WordCount")
val answer = Await.result(wCount ? "sent.txt", timeout.duration)
这意味着您的 WordCount
class 需要一个 receive
方法来接受 String
消息。那段代码应该聚合结果并告诉 sender()
,像这样:
class WordCount extends Actor {
def receive: Receive = {
case filename: String =>
// do all of your code here, using filename
sender() ! results
}
}
此外,您可以应用一些技术来处理 Futures.
,而不是阻塞上面Await
的结果