如何将未来的结果高效组合为未来
How to efficiently combine future results as a future
我有很多计算贡献一个最终结果,贡献的顺序没有限制。似乎 Futures 应该能够加快速度,他们确实这样做了,但不是我想象的那样。这是比较一种非常愚蠢的整数除法性能的代码:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
object scale_me_up {
def main(args: Array[String]) {
val M = 500 * 1000
val N = 5
Thread.sleep(3210) // let launcher settle down
for (it <- 0 until 15) {
val method = it % 3
val start = System.currentTimeMillis()
val result = divide(M, N, method)
val elapsed = System.currentTimeMillis() - start
assert(result == M / N)
if (it >= 6) {
val methods = Array("ordinary", "fast parallel", "nice parallel")
val name = methods(method)
println(f"$name%15s: $elapsed ms")
}
}
}
def is_multiple_of(m: Int, n: Int): Boolean = {
val result = !(1 until n).map(_ + (m / n) * n).toSet.contains(m)
assert(result == (m % n == 0)) // yes, a less crazy implementation exists
result
}
def divide(m: Int, n: Int, method: Int): Int = {
method match {
case 0 =>
(1 to m).count(is_multiple_of(_, n))
case 1 =>
(1 to m)
.map { x =>
Future { is_multiple_of(x, n) }
}
.count(Await.result(_, Duration.Inf))
case 2 =>
Await.result(divide_futuristically(m, n), Duration.Inf)
}
}
def divide_futuristically(m: Int, n: Int): Future[Int] = {
val futures = (1 to m).map { x =>
Future { is_multiple_of(x, n) }
}
Future.foldLeft(futures)(0) { (count, flag) =>
{ if (flag) { count + 1 } else { count } }
}
/* much worse performing alternative:
Future.sequence(futures).map(_.count(identity))
*/
}
}
当我 运行 这个时,并行 case 1
比普通的 case 0
代码快一些(欢呼),但是 case 2
需要两倍的时间。当然,这取决于系统以及每个未来是否需要完成足够的工作(此处随分母 N 增长)以抵消并发开销。 [PS] 正如预期的那样,减少 N 使 case 0
领先,增加 N 足以使 case 1
和 case 2
大约是我的 case 0
的两倍核心 CPU.
我相信 divide_futuristically
是表达这种计算的更好方式:返回带有组合结果的未来。阻塞正是我们在这里衡量性能所需要的。但实际上,堵得越多,大家吃完的速度也就越快。我究竟做错了什么?总结未来的几种选择(如 )都会受到相同的惩罚。
[PPS] 这是 Scala 2.12 运行ning on Java 11 on a 2 core CPU。在 6 核 CPU 上使用 Java 12,差异要小得多(尽管 sequence
的替代方案仍然拖延了脚步)。使用 Scala 2.13,差异甚至更小,随着每次迭代工作量的增加,divide_futuristically
开始超越竞争对手。未来终于来了...
看来你做的一切都是对的。我自己尝试了不同的方法,甚至 .par
但得到了相同或更差的结果。
我已经深入 Future.foldLeft
并尝试分析导致延迟的原因:
/** A non-blocking, asynchronous left fold over the specified futures,
* with the start value of the given zero.
* The fold is performed asynchronously in left-to-right order as the futures become completed.
* The result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*
* Example:
* {{{
* val futureSum = Future.foldLeft(futures)(0)(_ + _)
* }}}
*
* @tparam T the type of the value of the input Futures
* @tparam R the type of the value of the returned `Future`
* @param futures the `scala.collection.immutable.Iterable` of Futures to be folded
* @param zero the start value of the fold
* @param op the fold operation to be applied to the zero and futures
* @return the `Future` holding the result of the fold
*/
def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]])(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
foldNext(futures.iterator, zero, op)
private[this] def foldNext[T, R](i: Iterator[Future[T]], prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
if (!i.hasNext) successful(prevValue)
else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }
这部分:
else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }
.flatMap
生成一个新的 Future 提交给 executor
。换句话说,每个
{ (count, flag) =>
{ if (flag) { count + 1 } else { count } }
}
作为新的 Future 执行。
我想这部分会导致实验证明的延迟。
我有很多计算贡献一个最终结果,贡献的顺序没有限制。似乎 Futures 应该能够加快速度,他们确实这样做了,但不是我想象的那样。这是比较一种非常愚蠢的整数除法性能的代码:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
object scale_me_up {
def main(args: Array[String]) {
val M = 500 * 1000
val N = 5
Thread.sleep(3210) // let launcher settle down
for (it <- 0 until 15) {
val method = it % 3
val start = System.currentTimeMillis()
val result = divide(M, N, method)
val elapsed = System.currentTimeMillis() - start
assert(result == M / N)
if (it >= 6) {
val methods = Array("ordinary", "fast parallel", "nice parallel")
val name = methods(method)
println(f"$name%15s: $elapsed ms")
}
}
}
def is_multiple_of(m: Int, n: Int): Boolean = {
val result = !(1 until n).map(_ + (m / n) * n).toSet.contains(m)
assert(result == (m % n == 0)) // yes, a less crazy implementation exists
result
}
def divide(m: Int, n: Int, method: Int): Int = {
method match {
case 0 =>
(1 to m).count(is_multiple_of(_, n))
case 1 =>
(1 to m)
.map { x =>
Future { is_multiple_of(x, n) }
}
.count(Await.result(_, Duration.Inf))
case 2 =>
Await.result(divide_futuristically(m, n), Duration.Inf)
}
}
def divide_futuristically(m: Int, n: Int): Future[Int] = {
val futures = (1 to m).map { x =>
Future { is_multiple_of(x, n) }
}
Future.foldLeft(futures)(0) { (count, flag) =>
{ if (flag) { count + 1 } else { count } }
}
/* much worse performing alternative:
Future.sequence(futures).map(_.count(identity))
*/
}
}
当我 运行 这个时,并行 case 1
比普通的 case 0
代码快一些(欢呼),但是 case 2
需要两倍的时间。当然,这取决于系统以及每个未来是否需要完成足够的工作(此处随分母 N 增长)以抵消并发开销。 [PS] 正如预期的那样,减少 N 使 case 0
领先,增加 N 足以使 case 1
和 case 2
大约是我的 case 0
的两倍核心 CPU.
我相信 divide_futuristically
是表达这种计算的更好方式:返回带有组合结果的未来。阻塞正是我们在这里衡量性能所需要的。但实际上,堵得越多,大家吃完的速度也就越快。我究竟做错了什么?总结未来的几种选择(如
[PPS] 这是 Scala 2.12 运行ning on Java 11 on a 2 core CPU。在 6 核 CPU 上使用 Java 12,差异要小得多(尽管 sequence
的替代方案仍然拖延了脚步)。使用 Scala 2.13,差异甚至更小,随着每次迭代工作量的增加,divide_futuristically
开始超越竞争对手。未来终于来了...
看来你做的一切都是对的。我自己尝试了不同的方法,甚至 .par
但得到了相同或更差的结果。
我已经深入 Future.foldLeft
并尝试分析导致延迟的原因:
/** A non-blocking, asynchronous left fold over the specified futures,
* with the start value of the given zero.
* The fold is performed asynchronously in left-to-right order as the futures become completed.
* The result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*
* Example:
* {{{
* val futureSum = Future.foldLeft(futures)(0)(_ + _)
* }}}
*
* @tparam T the type of the value of the input Futures
* @tparam R the type of the value of the returned `Future`
* @param futures the `scala.collection.immutable.Iterable` of Futures to be folded
* @param zero the start value of the fold
* @param op the fold operation to be applied to the zero and futures
* @return the `Future` holding the result of the fold
*/
def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]])(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
foldNext(futures.iterator, zero, op)
private[this] def foldNext[T, R](i: Iterator[Future[T]], prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
if (!i.hasNext) successful(prevValue)
else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }
这部分:
else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }
.flatMap
生成一个新的 Future 提交给 executor
。换句话说,每个
{ (count, flag) =>
{ if (flag) { count + 1 } else { count } }
}
作为新的 Future 执行。
我想这部分会导致实验证明的延迟。