Scala 中的尾递归函数轮询

Tail-recursive, functional polling in Scala

我有一个通过网络查找 Elephant 的函数,returning Future[Option[Elephant]]。 return 值是一个 Future,因此当网络调用异步发生时,该函数可以立即 return。它包含一个 Option,其中 None 表示尚未可用,而 Some 表示已找到大象:

def checkForElephant : Future[Option[Elephant]] = ???

我想做的是编写一个名为 pollForElephant.

的函数
def pollForElephant : Future[Elephant] = ???

这个函数应该 return 一个 Future 超过一个进程,该进程将调用 checkForElephant 并且如果在第一次检查中找到一个元素就会很快成功,但此后每 10 次检查一次直到找到 Elephant 秒,即使没有大象并且它必须永远尝试。

最简单的方法就是强制同步检查,在 Future 域之外写一个递归函数来轮询,然后在整个事件上创建一个 Future :

import scala.annotation.tailrec
import scala.concurrent.{Await,Future,blocking}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

class Elephant;

def checkForElephant : Future[Option[Elephant]] = ???

def synchronousCheckForElephant : Option[Elephant] = blocking {
  Await.result( checkForElephant, Duration.Inf )
}

@tailrec
def poll( last : Option[Elephant] ) : Elephant = {
  last match {
    case Some( elephant ) => elephant
    case None => {
      blocking {
        Thread.sleep( 10.seconds.toMillis )
      }
      poll( synchronousCheckForElephant )
    }
  }
}

def pollForElephant : Future[Elephant] = Future {
   poll( synchronousCheckForElephant )
}

这似乎非常不雅,从 Future 域开始,强制同步,然后返回。我认为我应该能够从 Future 开始做所有事情。所以,我尝试了这个:

import scala.annotation.tailrec
import scala.concurrent.{Await,Future,blocking}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

class Elephant;

def checkForElephant : Future[Option[Elephant]] = ???

// oops! this is not @tailrec
def poll( last : Future[Option[Elephant]] ) : Future[Elephant] = {
  last.flatMap { mbElephant =>
    mbElephant match {
      case Some( elephant ) => Future.successful( elephant )
      case None => {
        blocking {
          Thread.sleep( 10.seconds.toMillis )
        }
        poll( checkForElephant )
      }
    }
  }
}

def pollForElephant : Future[Elephant] = poll( checkForElephant )

不幸的是,正如上面的评论所说,poll(...) 函数不是尾递归的。大象可能需要很长时间才能到达,我应该无限期地等待,但堆栈可能会爆炸。

整个事情感觉有点奇怪。我应该回到更容易推理的同步方法吗?在 Future?

中,有什么安全的方法可以实现我的意思吗?

我同意@PH88 的评论:您不需要调用尾递归,因为在 flatMap 中的 checkForElephant 中您创建了一个新的 Future,因此创建了一个新的堆栈。这是一个简单的代码,我试图模拟你的 checkForElephant:

type Elephant = String
val rnd = new Random()

def checkForElephant: Future[Option[Elephant]] = Future({
  val success = rnd.nextDouble() < 0.2
  println(s"Call to checkForElephant => $success")
  if (success) Some(Thread.currentThread().getStackTrace().mkString("\n")) else None
})

def poll(last: Future[Option[Elephant]]): Future[Elephant] = {
  last flatMap {
      case Some(elephant) => Future.successful(elephant)
      case None => {
        blocking {
          println("Sleeping")
          Thread.sleep(100.millisecond.toMillis)
        }
        poll(checkForElephant)
      }
    }
 }

def pollForElephant: Future[Elephant] = poll(checkForElephant)

val result = Await.result(pollForElephant, Duration.Inf)
println(result)

这是其中一次运行的输出:

Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => false
Sleeping
Call to checkForElephant => true
java.lang.Thread.getStackTrace(Thread.java:1556)
so.TestApp$$anonfun$so$TestApp$$checkForElephant.apply(TestApp.scala:97)
so.TestApp$$anonfun$so$TestApp$$checkForElephant.apply(TestApp.scala:94)
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

您可能会看到,尽管 checkForElephant 前 7 次返回 None,但堆栈跟踪很浅。

旁注

我不喜欢你的方法的一点是,当你休眠 10 秒时,你只是阻塞了一些线程。这对我来说似乎效率很低。如果你想有很多这样的电话,你可以考虑使用更聪明的东西,比如 Java ScheduledThreadPoolExecutor 或 Akka Actors.

更新

But would it leak memory, the logical equivalent of stack frames, maintained as objects on the heap?

不,不应该,除非你的 checkForElephant 中有一些非常奇怪的东西。要发生内存泄漏,一些内存应该由某些 "root" 保留。可能的根是:静态变量、线程局部变量和堆栈。正如我们所知,堆栈不会增长,因此它不会成为泄漏的来源。如果你不搞砸本地静态 and/or 线程,你应该是安全的。

至于线程消耗,如果你的系统真的只有一个"elephant",我不认为有什么明显更好。但是,如果您的 checkForElephant 实际上是 checkForElephant(id),那么您可能会无缘无故地消耗大量线程。改进这一点的第一步可能是使用 PromiseScheduledThreadPoolExecutor(我不知道 Scala 等效于它)并牺牲一些功能样式以更好地使用线程,例如:

// Just 1 thread should be enough assuming checkForElephant schedules 
// it's Future on some executor rather than current thread
val scheduledExecutor = new ScheduledThreadPoolExecutor(1)

def pollForElephant: Future[Elephant] = {
  def scheduleDelayedPoll(p: Promise[Elephant]) = {
    scheduledExecutor.schedule(new Runnable {
      override def run() = poll(p)
    },
      10, TimeUnit.SECONDS)
  }

  def poll(p: Promise[Elephant]): Unit = {
    checkForElephant.onComplete {
      case s: Success[Option[Elephant]] => if (s.value.isDefined) p.success(s.value.get) else scheduleDelayedPoll(p)
      case f: Failure[_] => scheduleDelayedPoll(p)
    }
  }

  val p = Promise[Elephant]()
  poll(p)
  p.future
}

如果您有更多负载,下一步将是为您的 checkForElephant 使用一些非阻塞 I/O 以不阻塞网络请求的线程。如果您实际使用 Web 服务,请查看 Play WS API which is a Scala-wrapper around AsyncHttpClient which is in turn based on Netty