concurrent.blocking 将来在某些情况下无法按预期工作

The concurrent.blocking in future not work as expected in some scenario

帮助解释 scala future 的 2 个现象(代码 4 和代码 5 中的粗体),谢谢。

代码1

package com.tst
import akka.actor.{Actor, ActorSystem, Props}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class MyActor extends Actor {
  def receive = {
    case _ =>
      for (i <- 1 to 6) {
        Future {
          println("start")
          Thread.sleep(30000)
          println("end")
        }
      }
  }
}

object Test extends App {
  val system = ActorSystem()
  val myActor = system.actorOf(Props[MyActor])
  myActor ! 'msg
}

对于code1,因为我的cpu核心是4个,所以一开始30秒,我们只能看到4个start打印,对我来说没问题。 (如果你的 cpu 有更多的内核,例如 8 个内核,你可以将循环从 6 更改为 10 以重现我的问题)

Code2

package com.tst
import akka.actor.{Actor, ActorSystem, Props}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, blocking}

class MyActor extends Actor {
  def receive = {
    case _ =>
      for (i <- 1 to 6) {
        Future {
          blocking {
            println("start")
            Thread.sleep(30000)
            println("end")
          }
        }
      }
  }
}

object Test extends App {
  val system = ActorSystem()
  val myActor = system.actorOf(Props[MyActor])
  myActor ! 'msg
}

对于code2,由于添加了blocking,使用了额外的线程,所以我们一开始可以看到6 start打印,对我来说没问题。

Code3

package com.tst
import akka.actor.{Actor, ActorSystem, Props}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, blocking}

class MyActor extends Actor {
  def receive = {
    case _ =>
      for (i <- 1 to 6) {
        Future {
          blocking { // lineA
            println("startpre")
            Thread.sleep(30000)
            println("endpre")
          }
        }
      }

      Thread.sleep(2000)

      for (i <- 1 to 6) {
        println("+")
        Future {
          blocking { // lineB
            println("start")
            Thread.sleep(30000)
            println("end")
          }
        }
      }
  }
}

object Test extends App {
  val system = ActorSystem()
  val myActor = system.actorOf(Props[MyActor])
  myActor ! 'msg
}

对于代码 3,我们可以看到 6 startpre & 6 start 在第一个 30 秒打印,对我来说没问题。

code4

只删除code3中的lineA,输出为:

startpre
startpre
startpre
startpre
+
+
+
+
+
+

这是我的第一个问题:为什么我在前 30 秒只能看到 4 startpre?为什么 lineB 中的 blocking 在这里不起作用?以我的理解,我也应该看到 6 start

code5

只删除code3的lineB,如果在code4中删除lineA记得取消删除,输出为:

startpre
startpre
startpre
startpre
startpre
startpre
+
+
+
+
+
+
start

这是我的第二个问题:这里有1个start可以看到,但是4个线程都已经被占用了,另外为lineA的Future启动了2个线程,为什么还有一个线程留给 lineB 打印 1 start?

Here我的观点描述的不错

将部分代码放在 blocking 中通知执行上下文可能需要一些其他线程结果来完成此阻塞。因此 运行 另一个线程完成加速评估可能是合理的。

换句话说,在你的情况下 code4 4 个线程正忙于从第一个循环开始执行 Futures,它们没有标记为 blocking,所以没有理由添加另一个工作线程在池中,因此,没有线程从第二个循环执行任何新的 Future

code5 中,所有线程都忙于标记为 blockingFuture。其他线程已启动,被另一个没有 blocking 的循环占用 Future,因此没有理由再添加一个线程。