monix:Task.executeWithFork 阻止执行?
monix: Task.executeWithFork prevents execution?
我不明白为什么在以下示例中添加 executeWithFork
会阻止 运行 的任务:
import java.util.concurrent.TimeUnit
import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.ConcurrentSubject
object Sandbox {
def main(args: Array[String]): Unit = {
implicit val scheduler: SchedulerService =
monix.execution.Scheduler(java.util.concurrent.Executors.newCachedThreadPool())
val input = ConcurrentSubject.publish[String]
// prints nothing
input.foreachL(println).executeWithFork.runAsync
// this works:
// input.foreachL(println).runAsync
input.onNext("one")
input.onNext("two")
scheduler.shutdown()
scheduler.awaitTermination(1, TimeUnit.MINUTES, monix.execution.Scheduler.Implicits.global)
}
}
您看到的行为是两个事实的结果:
使用 executeWithFork
为线程切换引入一点额外延迟
您使用 ConcurrentSubject.publish
(与 replay
相对)。如果您打开 PublishSubject
的文档,您可能会看到
A PublishSubject
emits to a subscriber only those items that are emitted by the source subsequent to the time of the subscription.
换句话说,您在发布 "one"
和 "two"
的主线程与必须订阅 input
以获取数据的分叉线程之间存在竞争条件.结果取决于哪个线程赢得比赛:订阅前发布的所有数据都将丢失。在我的硬件中,我几乎总是看到 "two"
,偶尔甚至会看到 "one"
,您的结果可能会有所不同。
最简单的测试方法是在第一个 input.onNext
之前添加 Thread.sleep(100)
,您应该会看到每次都打印这两个事件。您也可以尝试推送比 2 个更多的事件,以确保不会丢失所有内容。
我不明白为什么在以下示例中添加 executeWithFork
会阻止 运行 的任务:
import java.util.concurrent.TimeUnit
import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.ConcurrentSubject
object Sandbox {
def main(args: Array[String]): Unit = {
implicit val scheduler: SchedulerService =
monix.execution.Scheduler(java.util.concurrent.Executors.newCachedThreadPool())
val input = ConcurrentSubject.publish[String]
// prints nothing
input.foreachL(println).executeWithFork.runAsync
// this works:
// input.foreachL(println).runAsync
input.onNext("one")
input.onNext("two")
scheduler.shutdown()
scheduler.awaitTermination(1, TimeUnit.MINUTES, monix.execution.Scheduler.Implicits.global)
}
}
您看到的行为是两个事实的结果:
使用
executeWithFork
为线程切换引入一点额外延迟您使用
ConcurrentSubject.publish
(与replay
相对)。如果您打开PublishSubject
的文档,您可能会看到
A
PublishSubject
emits to a subscriber only those items that are emitted by the source subsequent to the time of the subscription.
换句话说,您在发布 "one"
和 "two"
的主线程与必须订阅 input
以获取数据的分叉线程之间存在竞争条件.结果取决于哪个线程赢得比赛:订阅前发布的所有数据都将丢失。在我的硬件中,我几乎总是看到 "two"
,偶尔甚至会看到 "one"
,您的结果可能会有所不同。
最简单的测试方法是在第一个 input.onNext
之前添加 Thread.sleep(100)
,您应该会看到每次都打印这两个事件。您也可以尝试推送比 2 个更多的事件,以确保不会丢失所有内容。