使用 Future.map 组合器时的 Future(<not completed>)

Future(<not completed>) when Future.map Combinator Used

在使用Future.map处理第一个Future之后,一旦Future.onComplete被回调,第二个依赖Future总是Future(<not completed>)。其他使用两个 Future 的结构从不表现出这种行为。

谁能解释为什么 Future.onComplete 被调用,尽管未来显然还不完整?

使用 Scala 2.12.3。如果需要,剪切并粘贴以下内容以评估此问题。

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}


object FutureNotCompleted {

  def threadNumber: String = f"${Thread.currentThread().getId.toInt}%2d"

  /////////////////////////////////////////////////////
  // Future completion control
  /////////////////////////////////////////////////////

  var futures: ListBuffer[Future[Any]] = ListBuffer()

  def add(future: Future[Any]): Unit = synchronized(futures += future)

  def remove(future: Future[Any]): Unit = synchronized(futures = futures.filter(_ != future))

  def loopTillCompleted: Unit = {
    var futuresOnList = true;

    while (futuresOnList) {
      Thread.sleep(100)
      for (future <- futures) {
        if (future.isCompleted) {
          future.value.get match {
            case Success(v) => println(s"${threadNumber} Success: ${v}")
            case Failure(e) => println(s"${threadNumber} Error: ${e}")
          }
          remove(future)
        }
      }
      if (futures.size == 0) futuresOnList = false
    }
  }

  /////////////////////////////////////////////////////
  // Future factory
  /////////////////////////////////////////////////////

  def createRegisteredFuture: Future[Int] = {
    val future = createFuture
    add(future)
    future
  }

  def createFuture: Future[Int] = Future {
    val i = (Math.random() * 1000).toInt
    println(s"${threadNumber} Future work start: ${i}")
    Thread.sleep((Math.random() * 1000).toLong)
    println(s"${threadNumber} Future work stop:  ${i}")
    if (Math.random > 0.7) throw new RuntimeException(s"${threadNumber} Error for ${i}")
    i
  }

  /////////////////////////////////////////////////////
  // Functions exhibiting Future use conditions
  /////////////////////////////////////////////////////

  def futureDoesNotComplete: Unit = {

    val f1 = createRegisteredFuture
    val f2 = f1.map {
      i => createRegisteredFuture
    }
    // This is never completed at the time the 'onComplete' callback is called
    f2.onComplete({
      case Success(j) => println(s"${threadNumber} j: ${j} ")
      case Failure(e) => println(s"${threadNumber} f2 Failure: ${e}")
    })

    loopTillCompleted
    println(s"${threadNumber} All done.")
  }

  def futureCompletes: Unit = {

    val f1 = createRegisteredFuture

    f1.onComplete({
      case Success(i) => {
        val f2 = createRegisteredFuture
        f2.onComplete({
          case Success(j) => println(s"${threadNumber} i: ${i} j: ${j} ${i}+${j}=${i + j}")
          case Failure(e) => println(s"${threadNumber} f2 Failure: ${e}")
        })
      }
      case Failure(e) => println(s"${threadNumber} f1 Failure: ${e}")
    })

    loopTillCompleted
    println(s"${threadNumber} All done.")
  }

  def futureCompletesFor: Unit = {

    for {
      f1 <- createRegisteredFuture
      f2 <- createRegisteredFuture
    } yield {
      println(s"f1: ${f1} f2: ${f2}: f1+f2=${f1+f2}")
    }

    loopTillCompleted
    println(s"${threadNumber} All done.")
  }

  def main(a: Array[String]): Unit = {
    //    futureCompletes
    futureDoesNotComplete
    //    futureCompletesFor
  }

}

我不是 100% 确定这里所需的行为到底是什么,但我会在这里尝试 flatMap,我认为你真的想要 onComplete 整个未来链。如果这是问题所在,那么你只是在完成外部未来,这就是我所怀疑的,因为当你使用 for (正在工作)时,它与 flatMap 的作用相同。

10 Future work start: 791
10 Future work stop:  791
11 Future work start: 819
11 Future work stop:  819
11 j: 819 
 1 Success: 791
 1 Success: 819
 1 All done.

我会在这里将 map 更改为 flatMap,因为 createRegisteredFuture 会创建一个未来。

val f2 = f1.flatMap {
      i => createRegisteredFuture
}