Scala Future for comprehension:顺序与并行
Scala Future for comprehension: sequential vs parallel
这里我们有 SeqPar
对象,它包含一个 task
例程,它是一个简单的 mock Future
,它打印出一些调试信息和 returns Future[Int]
类型。
问题是:为什么允许 experiment1
并行 运行,而 experiment2
总是 运行 顺序?
object SeqPar {
def experiment1: Int = {
val f1 = task(1)
val f2 = task(2)
val f3 = task(3)
val computation = for {
r1 <- f1
r2 <- f2
r3 <- f3
} yield (r1 + r2 + r3)
Await.result(computation, Duration.Inf)
}
def experiment2: Int = {
val computation = for {
r1 <- task(1)
r2 <- task(2)
r3 <- task(3)
} yield (r1 + r2 + r3)
Await.result(computation, Duration.Inf)
}
def task(i: Int): Future[Int] = {
Future {
println(s"task=$i thread=${Thread.currentThread().getId} time=${System.currentTimeMillis()}")
i * i
}
}
}
当我 运行 experiment1
它打印出来:
task=3 thread=24 time=1541326607613
task=1 thread=22 time=1541326607613
task=2 thread=21 time=1541326607613
同时 experiment2
:
task=1 thread=21 time=1541326610653
task=2 thread=20 time=1541326610653
task=3 thread=21 time=1541326610654
观察到的差异的原因是什么?我确实知道 for
理解像 f1.flatMap(r1 => f2.flatMap(r2 => f3.map(r3 => r1 + r2 + r3)))
一样脱糖,但我仍然遗漏了一点,为什么允许一个人并行 运行 而另一个人不允许。
这是因为 Scala Futures 是严格的。 Future 中的操作会在 Future 创建后立即执行,然后记忆其值。所以你正在失去参照透明度。在你的情况下,你的期货是在你的第一个 task 调用中执行的,结果被记住了。它们不会在 for 中再次执行。在第二种情况下,期货是在您的理解中创建的,结果是正确的。
这是 Future(…)
和 flatMap
的效果:
val future = Future(task)
并行启动 运行ning 任务
future.flatMap(result => task)
安排运行宁task
当future
完成
请注意,future.flatMap(result => task)
无法在 future
完成之前并行启动 运行ning 任务,因为要 运行 task
,我们需要 result
,仅在 future
完成时可用。
现在让我们看看你的 example1
:
def experiment1: Int = {
// construct three independent tasks and start running them
val f1 = task(1)
val f2 = task(2)
val f3 = task(3)
// construct one complicated task that is ...
val computation =
// ... waiting for f1 and then ...
f1.flatMap(r1 =>
// ... waiting for f2 and then ...
f2.flatMap(r2 =>
// ... waiting for f3 and then ...
f3.map(r3 =>
// ... adding some numbers.
r1 + r2 + r3)))
// now actually trigger all the waiting
Await.result(computation, Duration.Inf)
}
所以在example1
中,由于所有三个任务都花费相同的时间并且同时启动,所以我们可能只需要在等待f1
时阻塞。当我们开始等待 f2
时,它的结果应该已经存在了。
现在 example2
有何不同?
def experiment2: Int = {
// construct one complicated task that is ...
val computation =
// ... starting task1 and then waiting for it and then ...
task(1).flatMap(r1 =>
// ... starting task2 and then waiting for it and then ...
task(2).flatMap(r2 =>
// ... starting task3 and then waiting for it and then ...
task(3).map(r3 =>
// ... adding some numbers.
r1 + r2 + r3)))
// now actually trigger all the waiting and the starting of tasks
Await.result(computation, Duration.Inf)
}
在这个例子中,我们甚至没有在等待 task(1)
完成之前构造 task(2)
,因此任务不能 运行 并行。
因此,当使用 Scala 的 Future
编程时,您必须通过在 example1
和 example2
等代码之间正确选择来控制并发性。或者您可以查看提供更明确的并发控制的库。
这里我们有 SeqPar
对象,它包含一个 task
例程,它是一个简单的 mock Future
,它打印出一些调试信息和 returns Future[Int]
类型。
问题是:为什么允许 experiment1
并行 运行,而 experiment2
总是 运行 顺序?
object SeqPar {
def experiment1: Int = {
val f1 = task(1)
val f2 = task(2)
val f3 = task(3)
val computation = for {
r1 <- f1
r2 <- f2
r3 <- f3
} yield (r1 + r2 + r3)
Await.result(computation, Duration.Inf)
}
def experiment2: Int = {
val computation = for {
r1 <- task(1)
r2 <- task(2)
r3 <- task(3)
} yield (r1 + r2 + r3)
Await.result(computation, Duration.Inf)
}
def task(i: Int): Future[Int] = {
Future {
println(s"task=$i thread=${Thread.currentThread().getId} time=${System.currentTimeMillis()}")
i * i
}
}
}
当我 运行 experiment1
它打印出来:
task=3 thread=24 time=1541326607613
task=1 thread=22 time=1541326607613
task=2 thread=21 time=1541326607613
同时 experiment2
:
task=1 thread=21 time=1541326610653
task=2 thread=20 time=1541326610653
task=3 thread=21 time=1541326610654
观察到的差异的原因是什么?我确实知道 for
理解像 f1.flatMap(r1 => f2.flatMap(r2 => f3.map(r3 => r1 + r2 + r3)))
一样脱糖,但我仍然遗漏了一点,为什么允许一个人并行 运行 而另一个人不允许。
这是因为 Scala Futures 是严格的。 Future 中的操作会在 Future 创建后立即执行,然后记忆其值。所以你正在失去参照透明度。在你的情况下,你的期货是在你的第一个 task 调用中执行的,结果被记住了。它们不会在 for 中再次执行。在第二种情况下,期货是在您的理解中创建的,结果是正确的。
这是 Future(…)
和 flatMap
的效果:
val future = Future(task)
并行启动 运行ning 任务future.flatMap(result => task)
安排运行宁task
当future
完成
请注意,future.flatMap(result => task)
无法在 future
完成之前并行启动 运行ning 任务,因为要 运行 task
,我们需要 result
,仅在 future
完成时可用。
现在让我们看看你的 example1
:
def experiment1: Int = {
// construct three independent tasks and start running them
val f1 = task(1)
val f2 = task(2)
val f3 = task(3)
// construct one complicated task that is ...
val computation =
// ... waiting for f1 and then ...
f1.flatMap(r1 =>
// ... waiting for f2 and then ...
f2.flatMap(r2 =>
// ... waiting for f3 and then ...
f3.map(r3 =>
// ... adding some numbers.
r1 + r2 + r3)))
// now actually trigger all the waiting
Await.result(computation, Duration.Inf)
}
所以在example1
中,由于所有三个任务都花费相同的时间并且同时启动,所以我们可能只需要在等待f1
时阻塞。当我们开始等待 f2
时,它的结果应该已经存在了。
现在 example2
有何不同?
def experiment2: Int = {
// construct one complicated task that is ...
val computation =
// ... starting task1 and then waiting for it and then ...
task(1).flatMap(r1 =>
// ... starting task2 and then waiting for it and then ...
task(2).flatMap(r2 =>
// ... starting task3 and then waiting for it and then ...
task(3).map(r3 =>
// ... adding some numbers.
r1 + r2 + r3)))
// now actually trigger all the waiting and the starting of tasks
Await.result(computation, Duration.Inf)
}
在这个例子中,我们甚至没有在等待 task(1)
完成之前构造 task(2)
,因此任务不能 运行 并行。
因此,当使用 Scala 的 Future
编程时,您必须通过在 example1
和 example2
等代码之间正确选择来控制并发性。或者您可以查看提供更明确的并发控制的库。