这种僵局在 Scala Future 中是如何发生的?

How this deadlock happens in Scala Future?

此片段摘自 Monix document。 这是一个如何在Scala中进入死锁的例子。

import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)

def multiply(x: Int, y: Int) = Future {
  val a = addOne(x)
  val b = addOne(y)
  val result = for (r1 <- a; r2 <- b) yield r1 * r2

  // This can dead-lock due to the limited size of our thread-pool!
  Await.result(result, Duration.Inf)
}

我了解代码的作用,但不了解代码的执行方式。

为什么是 Await.result(result, Duration.Inf) 行导致死锁? (是的,我测试过)

是不是最外层的Future at multiply 函数占用了所有的线程池(唯一的)并因此死锁(因为addOne future 永远阻塞在等待线程上)?

Await.result(result, Duration.Inf)

当您使用 await 时,您正在等待未来完成。你给了无限的时间。因此,如果无论如何 Future 永远无法完成,主线程将进入无限等待。

首先我要说的是这段代码可以模拟死锁,不保证一直死锁 上面的代码发生了什么。我们在线程池中只有一个线程。一旦我们调用多重函数,因为它是未来的,所以它应该 运行 在一个单独的线程上说我们将线程池中的单个线程分配给这个函数。 现在函数 addOne 也是一个 future,所以它将再次在同一个线程上启动 运行ning,但不会等待 a=addOne 完成并移动到下一行 b=addOne 因此是同一个线程执行 a=addOne 现在执行 b=addOne 并且永远不会计算所有的值,并且未来不完整并且永远不会完成,因为我们只有一个线程,与它控制的行 b=addOne 相同的情况将不等待完成那个未来并移动到 for 循环,因为在 Scala 中也是异步的,所以它不会再次评估并移动到最后一行等待,它将等待无限长的时间来完成以前的期货。

进入死锁的充要条件

  1. 互斥条件
  2. 保持并等待条件
  3. 无抢先条件
  4. 循环等待条件

这里我们可以看到我们只有一个线程,所以要执行的进程不是互斥的。

一旦线程正在执行特定的块,因此它是一个未来而不是等待完成它,它会继续执行下一个块因此它到达 await 语句并且线程在那里等待所有其他未来未完成的正在等待线程完成未来。

一旦线程被分配给await,它就不能被抢占,这就是我们不能执行剩余未完成的未来的原因。

循环等待是因为 await 正在等待未完成的 future 完成,而其他 future 正在等待 await 调用完成。

简单地说,我们可以说控件将直接到达 await 语句并开始等待未完成的 future 完成,这无论如何都不会发生。因为我们的线程池中只有一个线程。

Is not that the outermost Future at multiply function occupy all the thread pool(the single one) and thus deadlock (because the addOne future is forever blocked on waiting for thread)?

是的,有点。

当您调用 val a = addOne(x) 时,您创建了一个开始等待线程的新 Future。但是,正如您所指出的,唯一的线程当前正在被最外层的 Future 使用。如果没有 await,那将不是问题,因为 Futures 能够处理这种情况。但是,这一行:

Await.result(result, Duration.Inf)

导致外部 Future 等待 result Future,后者不能 运行 因为外部 Future 仍在使用唯一可用的线程。 (当然,它也不能 运行 因为 ab Future 不能 运行,同样是由于外部 Future。)

这是一个更简单的例子,它也没有创建那么多 Futures 就死锁了:

def addTwo(x: Int) = Future {
  Await.result(addOne(x + 1), Duration.Inf)
}