如何将未来的结果高效组合为未来

How to efficiently combine future results as a future

我有很多计算贡献一个最终结果,贡献的顺序没有限制。似乎 Futures 应该能够加快速度,他们确实这样做了,但不是我想象的那样。这是比较一种非常愚蠢的整数除法性能的代码:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object scale_me_up {
  def main(args: Array[String]) {
    val M = 500 * 1000
    val N = 5
    Thread.sleep(3210) // let launcher settle down
    for (it <- 0 until 15) {
      val method = it % 3
      val start = System.currentTimeMillis()
      val result = divide(M, N, method)
      val elapsed = System.currentTimeMillis() - start
      assert(result == M / N)
      if (it >= 6) {
        val methods = Array("ordinary", "fast parallel", "nice parallel")
        val name = methods(method)
        println(f"$name%15s: $elapsed ms")
      }
    }
  }

  def is_multiple_of(m: Int, n: Int): Boolean = {
    val result = !(1 until n).map(_ + (m / n) * n).toSet.contains(m)
    assert(result == (m % n == 0)) // yes, a less crazy implementation exists
    result
  }

  def divide(m: Int, n: Int, method: Int): Int = {
    method match {
      case 0 =>
        (1 to m).count(is_multiple_of(_, n))
      case 1 =>
        (1 to m)
          .map { x =>
            Future { is_multiple_of(x, n) }
          }
          .count(Await.result(_, Duration.Inf))
      case 2 =>
        Await.result(divide_futuristically(m, n), Duration.Inf)
    }
  }

  def divide_futuristically(m: Int, n: Int): Future[Int] = {
    val futures = (1 to m).map { x =>
      Future { is_multiple_of(x, n) }
    }
    Future.foldLeft(futures)(0) { (count, flag) =>
      { if (flag) { count + 1 } else { count } }
    }
    /* much worse performing alternative:
    Future.sequence(futures).map(_.count(identity))
    */
  }
}

当我 运行 这个时,并行 case 1 比普通的 case 0 代码快一些(欢呼),但是 case 2 需要两倍的时间。当然,这取决于系统以及每个未来是否需要完成足够的工作(此处随分母 N 增长)以抵消并发开销。 [PS] 正如预期的那样,减少 N 使 case 0 领先,增加 N 足以使 case 1case 2 大约是我的 case 0 的两倍核心 CPU.

我相信 divide_futuristically 是表达这种计算的更好方式:返回带有组合结果的未来。阻塞正是​​我们在这里衡量性能所需要的。但实际上,堵得越多,大家吃完的速度也就越快。我究竟做错了什么?总结未来的几种选择(如 )都会受到相同的惩罚。

[PPS] 这是 Scala 2.12 运行ning on Java 11 on a 2 core CPU。在 6 核 CPU 上使用 Java 12,差异要小得多(尽管 sequence 的替代方案仍然拖延了脚步)。使用 Scala 2.13,差异甚至更小,随着每次迭代工作量的增加,divide_futuristically 开始超越竞争对手。未来终于来了...

看来你做的一切都是对的。我自己尝试了不同的方法,甚至 .par 但得到了相同或更差的结果。

我已经深入 Future.foldLeft 并尝试分析导致延迟的原因:

  /** A non-blocking, asynchronous left fold over the specified futures,
   *  with the start value of the given zero.
   *  The fold is performed asynchronously in left-to-right order as the futures become completed.
   *  The result will be the first failure of any of the futures, or any failure in the actual fold,
   *  or the result of the fold.
   *
   *  Example:
   *  {{{
   *    val futureSum = Future.foldLeft(futures)(0)(_ + _)
   *  }}}
   *
   * @tparam T       the type of the value of the input Futures
   * @tparam R       the type of the value of the returned `Future`
   * @param futures  the `scala.collection.immutable.Iterable` of Futures to be folded
   * @param zero     the start value of the fold
   * @param op       the fold operation to be applied to the zero and futures
   * @return         the `Future` holding the result of the fold
   */
  def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]])(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
    foldNext(futures.iterator, zero, op)

  private[this] def foldNext[T, R](i: Iterator[Future[T]], prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
    if (!i.hasNext) successful(prevValue)
    else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }

这部分:

else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }

.flatMap 生成一个新的 Future 提交给 executor。换句话说,每个

    { (count, flag) =>
      { if (flag) { count + 1 } else { count } }
    }

作为新的 Future 执行。

我想这部分会导致实验证明的延迟。