嵌套 Future.sequence 顺序执行包含的 Futures

Nested Future.sequence executes included Futures sequentially

我有一个 future(doFour) 被执行并将结果传递给平面图。 在平面图中,我执行了另外两个 future(doOnedoTwo) 函数,期望它们并行 运行 但我看到它们是 运行宁顺序 (2.13)。 Scastie

为什么 doOnedoTwo 不能并行执行?

如何让它们运行并行?

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object Test {
  def doOne(): Future[Unit] = Future {
    println("startFirst");      Thread.sleep(3000);     println("stopFirst")
  }

  def doTwo(): Future[Unit] = Future {
    println("startSecond");      Thread.sleep(1000);      println("stopSecond")
  }


  def doFour(): Future[Unit] = Future {
    println("do 4");     Thread.sleep(1000);     println("done 4")

  }


  def main(args: Array[String]) {


    val resOut = doFour().flatMap { a =>

      val futureOperations = Seq(doOne(), doTwo())

      val res = Future.sequence(futureOperations)
      res
    }

    val stream = Await.result(resOut, Duration.Inf)
  }
}

A Future 一经创建就可以执行。所以这一行创建了两个可能被执行的 Futures

val futureOperations = Seq(doOne(), doTwo())

Future.sequence 的调用将创建一个新的 Future 等待 轮流完成每个期货,但它们都已经完成可在代码中的此时执行。

val res = Future.sequence(futureOperations)

如果你想让Futures开始你需要使用map/flatMap:

val res = doOne().map( _ => doTwo())

使用此代码 doTwo 将不会被调用,直到 doOne 完成(如果 doOne 失败则根本不会调用)

在您的示例中这似乎没有发生的原因是您在 Future 中调用了一个阻塞操作,它阻塞了一个线程,否则该线程将用于执行其他 Future秒。所以虽然有两个Future可以执行,但实际上一次只有一个在执行。

如果您将代码标记为 blocking,它可以正常工作:

import scala.concurrent.blocking

def doOne(): Future[Unit] = Future {
  blocking{println("startFirst");      Thread.sleep(3000);     println("stop First")}
}

def doTwo(): Future[Unit] = Future {
  blocking{println("startSecond");      Thread.sleep(1000);      println("stop Second")}
}

请参阅评论部分,详细了解为什么不同版本的默认行为不同,以及为什么您不应该对独立 Futures 的相对执行顺序做出假设。