Scala 中的尾递归函数轮询
Tail-recursive, functional polling in Scala
我有一个通过网络查找 Elephant
的函数,returning Future[Option[Elephant]]
。 return 值是一个 Future
,因此当网络调用异步发生时,该函数可以立即 return。它包含一个 Option
,其中 None
表示尚未可用,而 Some
表示已找到大象:
def checkForElephant : Future[Option[Elephant]] = ???
我想做的是编写一个名为 pollForElephant
.
的函数
def pollForElephant : Future[Elephant] = ???
这个函数应该 return 一个 Future
超过一个进程,该进程将调用 checkForElephant
并且如果在第一次检查中找到一个元素就会很快成功,但此后每 10 次检查一次直到找到 Elephant
秒,即使没有大象并且它必须永远尝试。
最简单的方法就是强制同步检查,在 Future
域之外写一个递归函数来轮询,然后在整个事件上创建一个 Future
:
import scala.annotation.tailrec
import scala.concurrent.{Await,Future,blocking}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
class Elephant;
def checkForElephant : Future[Option[Elephant]] = ???
def synchronousCheckForElephant : Option[Elephant] = blocking {
Await.result( checkForElephant, Duration.Inf )
}
@tailrec
def poll( last : Option[Elephant] ) : Elephant = {
last match {
case Some( elephant ) => elephant
case None => {
blocking {
Thread.sleep( 10.seconds.toMillis )
}
poll( synchronousCheckForElephant )
}
}
}
def pollForElephant : Future[Elephant] = Future {
poll( synchronousCheckForElephant )
}
这似乎非常不雅,从 Future
域开始,强制同步,然后返回。我认为我应该能够从 Future
开始做所有事情。所以,我尝试了这个:
import scala.annotation.tailrec
import scala.concurrent.{Await,Future,blocking}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
class Elephant;
def checkForElephant : Future[Option[Elephant]] = ???
// oops! this is not @tailrec
def poll( last : Future[Option[Elephant]] ) : Future[Elephant] = {
last.flatMap { mbElephant =>
mbElephant match {
case Some( elephant ) => Future.successful( elephant )
case None => {
blocking {
Thread.sleep( 10.seconds.toMillis )
}
poll( checkForElephant )
}
}
}
}
def pollForElephant : Future[Elephant] = poll( checkForElephant )
不幸的是,正如上面的评论所说,poll(...)
函数不是尾递归的。大象可能需要很长时间才能到达,我应该无限期地等待,但堆栈可能会爆炸。
整个事情感觉有点奇怪。我应该回到更容易推理的同步方法吗?在 Future
?
中,有什么安全的方法可以实现我的意思吗?
我同意@PH88 的评论:您不需要调用尾递归,因为在 flatMap
中的 checkForElephant
中您创建了一个新的 Future
,因此创建了一个新的堆栈。这是一个简单的代码,我试图模拟你的 checkForElephant
:
type Elephant = String
val rnd = new Random()
def checkForElephant: Future[Option[Elephant]] = Future({
val success = rnd.nextDouble() < 0.2
println(s"Call to checkForElephant => $success")
if (success) Some(Thread.currentThread().getStackTrace().mkString("\n")) else None
})
def poll(last: Future[Option[Elephant]]): Future[Elephant] = {
last flatMap {
case Some(elephant) => Future.successful(elephant)
case None => {
blocking {
println("Sleeping")
Thread.sleep(100.millisecond.toMillis)
}
poll(checkForElephant)
}
}
}
def pollForElephant: Future[Elephant] = poll(checkForElephant)
val result = Await.result(pollForElephant, Duration.Inf)
println(result)
这是其中一次运行的输出:
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => true
java.lang.Thread.getStackTrace(Thread.java:1556)
so.TestApp$$anonfun$so$TestApp$$checkForElephant.apply(TestApp.scala:97)
so.TestApp$$anonfun$so$TestApp$$checkForElephant.apply(TestApp.scala:94)
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
您可能会看到,尽管 checkForElephant
前 7 次返回 None
,但堆栈跟踪很浅。
旁注
我不喜欢你的方法的一点是,当你休眠 10 秒时,你只是阻塞了一些线程。这对我来说似乎效率很低。如果你想有很多这样的电话,你可以考虑使用更聪明的东西,比如 Java ScheduledThreadPoolExecutor 或 Akka Actors.
更新
But would it leak memory, the logical equivalent of stack frames, maintained as objects on the heap?
不,不应该,除非你的 checkForElephant
中有一些非常奇怪的东西。要发生内存泄漏,一些内存应该由某些 "root" 保留。可能的根是:静态变量、线程局部变量和堆栈。正如我们所知,堆栈不会增长,因此它不会成为泄漏的来源。如果你不搞砸本地静态 and/or 线程,你应该是安全的。
至于线程消耗,如果你的系统真的只有一个"elephant",我不认为有什么明显更好。但是,如果您的 checkForElephant
实际上是 checkForElephant(id)
,那么您可能会无缘无故地消耗大量线程。改进这一点的第一步可能是使用 Promise
和 ScheduledThreadPoolExecutor
(我不知道 Scala 等效于它)并牺牲一些功能样式以更好地使用线程,例如:
// Just 1 thread should be enough assuming checkForElephant schedules
// it's Future on some executor rather than current thread
val scheduledExecutor = new ScheduledThreadPoolExecutor(1)
def pollForElephant: Future[Elephant] = {
def scheduleDelayedPoll(p: Promise[Elephant]) = {
scheduledExecutor.schedule(new Runnable {
override def run() = poll(p)
},
10, TimeUnit.SECONDS)
}
def poll(p: Promise[Elephant]): Unit = {
checkForElephant.onComplete {
case s: Success[Option[Elephant]] => if (s.value.isDefined) p.success(s.value.get) else scheduleDelayedPoll(p)
case f: Failure[_] => scheduleDelayedPoll(p)
}
}
val p = Promise[Elephant]()
poll(p)
p.future
}
如果您有更多负载,下一步将是为您的 checkForElephant
使用一些非阻塞 I/O 以不阻塞网络请求的线程。如果您实际使用 Web 服务,请查看 Play WS API which is a Scala-wrapper around AsyncHttpClient which is in turn based on Netty
我有一个通过网络查找 Elephant
的函数,returning Future[Option[Elephant]]
。 return 值是一个 Future
,因此当网络调用异步发生时,该函数可以立即 return。它包含一个 Option
,其中 None
表示尚未可用,而 Some
表示已找到大象:
def checkForElephant : Future[Option[Elephant]] = ???
我想做的是编写一个名为 pollForElephant
.
def pollForElephant : Future[Elephant] = ???
这个函数应该 return 一个 Future
超过一个进程,该进程将调用 checkForElephant
并且如果在第一次检查中找到一个元素就会很快成功,但此后每 10 次检查一次直到找到 Elephant
秒,即使没有大象并且它必须永远尝试。
最简单的方法就是强制同步检查,在 Future
域之外写一个递归函数来轮询,然后在整个事件上创建一个 Future
:
import scala.annotation.tailrec
import scala.concurrent.{Await,Future,blocking}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
class Elephant;
def checkForElephant : Future[Option[Elephant]] = ???
def synchronousCheckForElephant : Option[Elephant] = blocking {
Await.result( checkForElephant, Duration.Inf )
}
@tailrec
def poll( last : Option[Elephant] ) : Elephant = {
last match {
case Some( elephant ) => elephant
case None => {
blocking {
Thread.sleep( 10.seconds.toMillis )
}
poll( synchronousCheckForElephant )
}
}
}
def pollForElephant : Future[Elephant] = Future {
poll( synchronousCheckForElephant )
}
这似乎非常不雅,从 Future
域开始,强制同步,然后返回。我认为我应该能够从 Future
开始做所有事情。所以,我尝试了这个:
import scala.annotation.tailrec
import scala.concurrent.{Await,Future,blocking}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
class Elephant;
def checkForElephant : Future[Option[Elephant]] = ???
// oops! this is not @tailrec
def poll( last : Future[Option[Elephant]] ) : Future[Elephant] = {
last.flatMap { mbElephant =>
mbElephant match {
case Some( elephant ) => Future.successful( elephant )
case None => {
blocking {
Thread.sleep( 10.seconds.toMillis )
}
poll( checkForElephant )
}
}
}
}
def pollForElephant : Future[Elephant] = poll( checkForElephant )
不幸的是,正如上面的评论所说,poll(...)
函数不是尾递归的。大象可能需要很长时间才能到达,我应该无限期地等待,但堆栈可能会爆炸。
整个事情感觉有点奇怪。我应该回到更容易推理的同步方法吗?在 Future
?
我同意@PH88 的评论:您不需要调用尾递归,因为在 flatMap
中的 checkForElephant
中您创建了一个新的 Future
,因此创建了一个新的堆栈。这是一个简单的代码,我试图模拟你的 checkForElephant
:
type Elephant = String
val rnd = new Random()
def checkForElephant: Future[Option[Elephant]] = Future({
val success = rnd.nextDouble() < 0.2
println(s"Call to checkForElephant => $success")
if (success) Some(Thread.currentThread().getStackTrace().mkString("\n")) else None
})
def poll(last: Future[Option[Elephant]]): Future[Elephant] = {
last flatMap {
case Some(elephant) => Future.successful(elephant)
case None => {
blocking {
println("Sleeping")
Thread.sleep(100.millisecond.toMillis)
}
poll(checkForElephant)
}
}
}
def pollForElephant: Future[Elephant] = poll(checkForElephant)
val result = Await.result(pollForElephant, Duration.Inf)
println(result)
这是其中一次运行的输出:
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => true
java.lang.Thread.getStackTrace(Thread.java:1556)
so.TestApp$$anonfun$so$TestApp$$checkForElephant.apply(TestApp.scala:97)
so.TestApp$$anonfun$so$TestApp$$checkForElephant.apply(TestApp.scala:94)
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
您可能会看到,尽管 checkForElephant
前 7 次返回 None
,但堆栈跟踪很浅。
旁注
我不喜欢你的方法的一点是,当你休眠 10 秒时,你只是阻塞了一些线程。这对我来说似乎效率很低。如果你想有很多这样的电话,你可以考虑使用更聪明的东西,比如 Java ScheduledThreadPoolExecutor 或 Akka Actors.
更新
But would it leak memory, the logical equivalent of stack frames, maintained as objects on the heap?
不,不应该,除非你的 checkForElephant
中有一些非常奇怪的东西。要发生内存泄漏,一些内存应该由某些 "root" 保留。可能的根是:静态变量、线程局部变量和堆栈。正如我们所知,堆栈不会增长,因此它不会成为泄漏的来源。如果你不搞砸本地静态 and/or 线程,你应该是安全的。
至于线程消耗,如果你的系统真的只有一个"elephant",我不认为有什么明显更好。但是,如果您的 checkForElephant
实际上是 checkForElephant(id)
,那么您可能会无缘无故地消耗大量线程。改进这一点的第一步可能是使用 Promise
和 ScheduledThreadPoolExecutor
(我不知道 Scala 等效于它)并牺牲一些功能样式以更好地使用线程,例如:
// Just 1 thread should be enough assuming checkForElephant schedules
// it's Future on some executor rather than current thread
val scheduledExecutor = new ScheduledThreadPoolExecutor(1)
def pollForElephant: Future[Elephant] = {
def scheduleDelayedPoll(p: Promise[Elephant]) = {
scheduledExecutor.schedule(new Runnable {
override def run() = poll(p)
},
10, TimeUnit.SECONDS)
}
def poll(p: Promise[Elephant]): Unit = {
checkForElephant.onComplete {
case s: Success[Option[Elephant]] => if (s.value.isDefined) p.success(s.value.get) else scheduleDelayedPoll(p)
case f: Failure[_] => scheduleDelayedPoll(p)
}
}
val p = Promise[Elephant]()
poll(p)
p.future
}
如果您有更多负载,下一步将是为您的 checkForElephant
使用一些非阻塞 I/O 以不阻塞网络请求的线程。如果您实际使用 Web 服务,请查看 Play WS API which is a Scala-wrapper around AsyncHttpClient which is in turn based on Netty