monix:Task.executeWithFork 阻止执行?

monix: Task.executeWithFork prevents execution?

我不明白为什么在以下示例中添加 executeWithFork 会阻止 运行 的任务:

import java.util.concurrent.TimeUnit

import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.ConcurrentSubject

object Sandbox {

  def main(args: Array[String]): Unit = {
    implicit val scheduler: SchedulerService =
      monix.execution.Scheduler(java.util.concurrent.Executors.newCachedThreadPool())

    val input = ConcurrentSubject.publish[String]

    // prints nothing
    input.foreachL(println).executeWithFork.runAsync
    // this works:
    // input.foreachL(println).runAsync

    input.onNext("one")
    input.onNext("two")

    scheduler.shutdown()
    scheduler.awaitTermination(1, TimeUnit.MINUTES, monix.execution.Scheduler.Implicits.global)
  }
}

您看到的行为是两个事实的结果:

  1. 使用 executeWithFork 为线程切换引入一点额外延迟

  2. 您使用 ConcurrentSubject.publish(与 replay 相对)。如果您打开 PublishSubject 的文档,您可能会看到

A PublishSubject emits to a subscriber only those items that are emitted by the source subsequent to the time of the subscription.

换句话说,您在发布 "one""two" 的主线程与必须订阅 input 以获取数据的分叉线程之间存在竞争条件.结果取决于哪个线程赢得比赛:订阅前发布的所有数据都将丢失。在我的硬件中,我几乎总是看到 "two",偶尔甚至会看到 "one",您的结果可能会有所不同。

最简单的测试方法是在第一个 input.onNext 之前添加 Thread.sleep(100),您应该会看到每次都打印这两个事件。您也可以尝试推送比 2 个更多的事件,以确保不会丢失所有内容。