Monix Task.sleep 和单线程执行
Monix Task.sleep and single thread execution
我正在尝试理解 Monix 中的任务调度原则。
如预期的那样,以下代码(来源:https://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3)仅生成“1”。
val s1: Scheduler = Scheduler(
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()),
ExecutionModel.SynchronousExecution)
def repeat(id: Int): Task[Unit] =
Task(println(s"$id ${Thread.currentThread().getName}")) >> repeat(id)
val prog: Task[(Unit, Unit)] = (repeat(1), repeat(2)).parTupled
prog.runToFuture(s1)
// Output:
// 1 pool-1-thread-1
// 1 pool-1-thread-1
// 1 pool-1-thread-1
// ...
当我们将Task.sleep
添加到repeat
方法时
def repeat(id: Int): Task[Unit] =
Task(println(s"$id ${Thread.currentThread().getName}")) >>
Task.sleep(1.millis) >> repeat(id)
输出变为
// Output
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// ...
这两个任务现在在一个线程上同时执行!好的 :)
一些合作屈服已经开始。这里究竟发生了什么?谢谢:)
编辑:Task.shift
而不是 Task.sleep
。
也会发生同样的情况
我不确定这是否是您要找的答案,但答案是:
尽管命名另有所指,但 Task.sleep
无法与 Thread.sleep
等更传统的方法进行比较。
Task.sleep
实际上并不是在线程上 运行,而是简单地指示调度程序在经过的时间后 运行 回调。
这里有一段来自 monix/TaskSleep.scala
的代码片段用于比较:
[...]
implicit val s = ctx.scheduler
val c = TaskConnectionRef()
ctx.connection.push(c.cancel)
c := ctx.scheduler.scheduleOnce(
timespan.length,
timespan.unit,
new SleepRunnable(ctx, cb)
)
[...]
private final class SleepRunnable(ctx: Context, cb: Callback[Throwable, Unit]) extends Runnable {
def run(): Unit = {
ctx.connection.pop()
// We had an async boundary, as we must reset the frame
ctx.frameRef.reset()
cb.onSuccess(())
}
}
[...]
在执行回调(此处:cb
)之前的时间段内,您的单线程调度程序(此处:ctx.scheduler
)可以简单地使用他的线程进行接下来排队的任何计算。
这也解释了为什么这种方法更可取,因为我们不会在睡眠间隔期间阻塞线程 - 浪费更少的计算周期。
希望这对您有所帮助。
扩展 Markus 的回答。
作为心智模型(为了便于说明),您可以将线程池想象成一个堆栈。因为,你只有一个执行线程池,它会先尝试 运行 repeat1
然后 repeat2
.
在内部,一切都只是一个巨人FlatMap
。 运行 循环将根据执行模型安排所有任务。
发生的事情是,sleep
将 运行nable 调度到线程池。它将 运行nable (repeat1
) 推到堆栈的顶部,从而为 repeat2
提供了 运行 的机会。 repeat2
.
也会发生同样的事情
请注意,默认情况下,Monix 的执行模型将为每个 1024 平面图执行异步边界。
我正在尝试理解 Monix 中的任务调度原则。 如预期的那样,以下代码(来源:https://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3)仅生成“1”。
val s1: Scheduler = Scheduler(
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()),
ExecutionModel.SynchronousExecution)
def repeat(id: Int): Task[Unit] =
Task(println(s"$id ${Thread.currentThread().getName}")) >> repeat(id)
val prog: Task[(Unit, Unit)] = (repeat(1), repeat(2)).parTupled
prog.runToFuture(s1)
// Output:
// 1 pool-1-thread-1
// 1 pool-1-thread-1
// 1 pool-1-thread-1
// ...
当我们将Task.sleep
添加到repeat
方法时
def repeat(id: Int): Task[Unit] =
Task(println(s"$id ${Thread.currentThread().getName}")) >>
Task.sleep(1.millis) >> repeat(id)
输出变为
// Output
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// ...
这两个任务现在在一个线程上同时执行!好的 :) 一些合作屈服已经开始。这里究竟发生了什么?谢谢:)
编辑:Task.shift
而不是 Task.sleep
。
我不确定这是否是您要找的答案,但答案是:
尽管命名另有所指,但 Task.sleep
无法与 Thread.sleep
等更传统的方法进行比较。
Task.sleep
实际上并不是在线程上 运行,而是简单地指示调度程序在经过的时间后 运行 回调。
这里有一段来自 monix/TaskSleep.scala
的代码片段用于比较:
[...]
implicit val s = ctx.scheduler
val c = TaskConnectionRef()
ctx.connection.push(c.cancel)
c := ctx.scheduler.scheduleOnce(
timespan.length,
timespan.unit,
new SleepRunnable(ctx, cb)
)
[...]
private final class SleepRunnable(ctx: Context, cb: Callback[Throwable, Unit]) extends Runnable {
def run(): Unit = {
ctx.connection.pop()
// We had an async boundary, as we must reset the frame
ctx.frameRef.reset()
cb.onSuccess(())
}
}
[...]
在执行回调(此处:cb
)之前的时间段内,您的单线程调度程序(此处:ctx.scheduler
)可以简单地使用他的线程进行接下来排队的任何计算。
这也解释了为什么这种方法更可取,因为我们不会在睡眠间隔期间阻塞线程 - 浪费更少的计算周期。
希望这对您有所帮助。
扩展 Markus 的回答。
作为心智模型(为了便于说明),您可以将线程池想象成一个堆栈。因为,你只有一个执行线程池,它会先尝试 运行 repeat1
然后 repeat2
.
在内部,一切都只是一个巨人FlatMap
。 运行 循环将根据执行模型安排所有任务。
发生的事情是,sleep
将 运行nable 调度到线程池。它将 运行nable (repeat1
) 推到堆栈的顶部,从而为 repeat2
提供了 运行 的机会。 repeat2
.
请注意,默认情况下,Monix 的执行模型将为每个 1024 平面图执行异步边界。