协程 - Dispatchers.Main.immediate with join 在 runBlocking 中死锁

Coroutines - Dispatchers.Main.immediate with join is deadlocking inside runBlocking

分解 Android 上的一个简单案例以挂起主线程并使用协程执行并发处理,以下代码仅打印 LaunchedrunBlocking 永远不会完成:

runBlocking {
      val ioJob = launch(Dispatchers.IO) {
        delay(1000)
      }
      val mainJob = launch(Dispatchers.Main.immediate) {
        Log.d("Routine", "Launched")
        ioJob.join()
        Log.d("Routine", "Joined")
      }

      listOf(mainJob, ioJob).joinAll()
    }

当然,如果我们将 Dispatchers.Main.immediate 替换为 Dispatchers.IO 一切正常,但我的一些并发处理应该在 main 上 运行。正如预期的那样,使用 Dispatchers.Main 不会记录任何内容。似乎一旦 joinrunBlocking 的根目录中执行,它就会使发送到 main.

的任何暂停的东西瘫痪。

值得注意的是,基于 CountDownLatch 和线程的方法效果很好:

    val latchMain = CountDownLatch(1)
    val primaryLatch = CountDownLatch(2)

    val ioExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory())

    log("Execute IO")
    ioExecutor.execute(
        Runnable {
          log("Delay IO")
          Thread.sleep(1000)
          log("Countdown Main")
          latchMain.countDown()
          Thread.sleep(3000)
          primaryLatch.countDown()
        })

    log("Execute Main")
    Runnable {
      log("Await Main")
      latchMain.await()
      log("Countdown Primary")
      primaryLatch.countDown()
    }.run()

    log("Await Primary")
    primaryLatch.await()
    log("Continue")

    stepTracker.endTracking()
    return stepTracker.stepGraphTrace
  }

  private fun log(msg: String) = Log.i("Routine", "[${Thread.currentThread().name}] $msg")

输出:

2021-08-11 11:04:06.508 [main] Execute IO
2021-08-11 11:04:06.509 [main] Execute Main
2021-08-11 11:04:06.510 [main] Await Main
2021-08-11 11:04:06.510 [pool-25-thread-1] Delay IO
2021-08-11 11:04:07.512 [pool-25-thread-1] Countdown Main
2021-08-11 11:04:07.513 [main] Countdown Primary
2021-08-11 11:04:07.513 [main] Await Primary
2021-08-11 11:04:10.514 [main] Continue

有什么想法吗?即将就此向 JetBrains 提交问题。

注意:为了解决问题,不要故意用 runBlocking 阻塞 main/UI 线程,因为虽然 UI 线程在 runBlocking 中被释放(如果它暂停)为其子协程,runBlocking 之外的任何内容都不会执行(没有绘制方法,什么都没有),只要 runBlocking 处于活动状态,就会导致冻结 UI。

这可能与“immediate”的实现方式有关。不只是 join(),它是 任何挂起函数,如果你调用 yield() 它将无济于事,如果你调用 delay() mainJob 只有在延迟完成后才会恢复。基本上mainJob不会 只要 runBlocking 是 运行 就恢复,并且 runBlocking 直到 mainJob 完成,根据定义,这是一个 死锁

你可以省略指定 Dispatcher.Main.immediatemainJob 并让它 从 runBlocking 继承其上下文。如果你想尽快开始执行 mainJob 正如它声明的那样,只是从 runBlocking 向它产生线程。

runBlocking {
    log("runBlocking: start")

    val ioJob = launch(Dispatchers.IO) {
        log("ioJob: About to delay")
        delay(1000)
        log("ioJob: Delay done")
    }

    val mainJob = launch {
        log("mainJob: About to join ioJob")
        ioJob.join()
        log("mainJob: ioJob successfully joined")
    }


    // yield() if you want to start executing mainJob before the rest of the runBlocking code

    log("runBlocking: about to join jobs")
    listOf(ioJob, mainJob).joinAll()
    log("runBlocking: jobs joined, exiting")
}

private fun log(msg: String) = Log.i("MainActivity", "[${Thread.currentThread().name}] $msg")

w/o yield()

I/MainActivity: [main] runBlocking: start
I/MainActivity: [main] runBlocking: about to join jobs
I/MainActivity: [DefaultDispatcher-worker-1] ioJob: About to delay
I/MainActivity: [main] mainJob: About to join ioJob
I/MainActivity: [DefaultDispatcher-worker-3] ioJob: Delay done
I/MainActivity: [main] mainJob: ioJob successfully joined
I/MainActivity: [main] runBlocking: jobs joined, exiting

假设这个 runBlocking 代码是从主线程调用的,那么在 Dispatchers.Main 上启动的内部协程将永远不会到达主线程循环程序队列的前面,因为 runBlocking仍然阻塞主线程,因此协程永远无法启动。

当你使用Main.immediate时,启动协程的开头至少可以运行因为immediate运行协程最多如果您已经在主线程上,则第一个暂停点 ,如果您在主线程上启动 runBlocking。当它到达挂起函数调用 join() 时,它被放入主线程循环程序的队列中,你又回到了与上面相同的问题。挂起函数调用总是打断协程的继续。

不是在回答原来的问题,而是在质疑问题本身 ;-)。

基于闩锁的解决方案“假装”对两个“作业”使用并发,但只有 io 作业才这样做。第二个 Runnable 只是将代码包装到 运行 而什么都不做。

这是在做完全相同的事情,但更简单:

val latchMain = CountDownLatch(1)
val ioExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory())

ioExecutor.execute(
    Runnable {
        Thread.sleep(1000)
        latchMain.countDown()
    }
)

latchMain.await()

基于“相同”协程的实现非常简单:

runBlocking {
    val ioJob = launch(Dispatchers.IO) {
        delay(1000)
    }

    ioJob.join()
}

或者对 io 作业使用相同的线程池:

runBlocking {
    val ioJob = launch(ioExecutor.asCoroutineDispatcher()) {
        delay(1000)
    }

    ioJob.join()
}