为什么 Monix Observable 多生产一个元素
Why Monix Observable produces one element more than needed
我正在玩 Monix 流并获得了我从 Iterator
构建 Observable
的示例。在我看来,当 运行 它产生的元素比我预期的多 1 个。以下代码表明:
val count = AtomicLong(0)
def produceValue(): Long = {
count.transformAndGet { i =>
logger.info(s"Producing value: ${i + 1}")
i + 1
}
}
def more(): Boolean = count.get < 20
lazy val iter = new Iterator[Long] {
override def hasNext: Boolean = more()
override def next(): Long = produceValue()
}
Observable
.fromIterator(iter)
.mapParallelUnordered(5) { x =>
Task(x)
.foreachL { x =>
logger.info(s"Transforming $x")
}
.delayExecution(3.seconds)
}
.consumeWith(Consumer.complete)
.runAsync
这个案子很简单。有 Iterator
每次产生 next
值时打印日志。下游阶段是简单的延迟任务 运行,并行计数为 5,以查看发生了什么。现在输出如下:
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17
如您所见,最初流产生 6 个元素,而我预计只有 5 个(因为下游阶段 mapParallelUnordered
只需要 5 个元素。实际上这没什么大不了的,但我只是想明白为什么会这样。
另外,为什么初始值在 main
线程中生成,而后续值在 execution-context
线程池中调用?不应该都使用用于 运行 整个流的调度程序吗?
As you can see, initially the stream produces 6 elements
低级通信协议是围绕 Subscriber
及其(继承的)方法 onNext 设计的,具有以下签名:
def onNext(elem: A): Future[Ack]
如果我们将创建和转换都想象成一个阶段,则源可观察(fromIterator
在您的情况下)将其价值推向 订阅者,并且,当得到承认时, 推下一个。
那么会发生什么:
fromIterator
阶段生成值1
- 1 值被推送到
mapAsyncUnordered
阶段,在那里它被接受(b/c 有空闲工人),所以确认是立即 Continue
- 对值 2-5 重复上述步骤
fromIterator
阶段生成值 6(这是当您看到输出时)
- 6 值被推到
mapAsyncUnordered
阶段。这一次,它不能立即被接受,所以确认是在 Continue
一段时间后。在此之前,fromIterator
. 不再生成任何值
需要注意的是,不是mapAsyncUnordered
阶段从fromIterator
中拉取值,而是fromIterator
自己生成这些值,它无法提前知道下游转换是否会立即接受该值。
Shouldn't all be using scheduler that is used to run entire stream?
Monix Observable 出于性能原因尝试尽可能同步工作(切换线程很昂贵)。通常,除非通过 executeAsync
、executeOn
等方法显式控制,否则您无法判断操作是否会在同一线程上执行。
我正在玩 Monix 流并获得了我从 Iterator
构建 Observable
的示例。在我看来,当 运行 它产生的元素比我预期的多 1 个。以下代码表明:
val count = AtomicLong(0)
def produceValue(): Long = {
count.transformAndGet { i =>
logger.info(s"Producing value: ${i + 1}")
i + 1
}
}
def more(): Boolean = count.get < 20
lazy val iter = new Iterator[Long] {
override def hasNext: Boolean = more()
override def next(): Long = produceValue()
}
Observable
.fromIterator(iter)
.mapParallelUnordered(5) { x =>
Task(x)
.foreachL { x =>
logger.info(s"Transforming $x")
}
.delayExecution(3.seconds)
}
.consumeWith(Consumer.complete)
.runAsync
这个案子很简单。有 Iterator
每次产生 next
值时打印日志。下游阶段是简单的延迟任务 运行,并行计数为 5,以查看发生了什么。现在输出如下:
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17
如您所见,最初流产生 6 个元素,而我预计只有 5 个(因为下游阶段 mapParallelUnordered
只需要 5 个元素。实际上这没什么大不了的,但我只是想明白为什么会这样。
另外,为什么初始值在 main
线程中生成,而后续值在 execution-context
线程池中调用?不应该都使用用于 运行 整个流的调度程序吗?
As you can see, initially the stream produces 6 elements
低级通信协议是围绕 Subscriber
及其(继承的)方法 onNext 设计的,具有以下签名:
def onNext(elem: A): Future[Ack]
如果我们将创建和转换都想象成一个阶段,则源可观察(fromIterator
在您的情况下)将其价值推向 订阅者,并且,当得到承认时, 推下一个。
那么会发生什么:
fromIterator
阶段生成值1- 1 值被推送到
mapAsyncUnordered
阶段,在那里它被接受(b/c 有空闲工人),所以确认是立即Continue
- 对值 2-5 重复上述步骤
fromIterator
阶段生成值 6(这是当您看到输出时)- 6 值被推到
mapAsyncUnordered
阶段。这一次,它不能立即被接受,所以确认是在Continue
一段时间后。在此之前,fromIterator
. 不再生成任何值
需要注意的是,不是mapAsyncUnordered
阶段从fromIterator
中拉取值,而是fromIterator
自己生成这些值,它无法提前知道下游转换是否会立即接受该值。
Shouldn't all be using scheduler that is used to run entire stream?
Monix Observable 出于性能原因尝试尽可能同步工作(切换线程很昂贵)。通常,除非通过 executeAsync
、executeOn
等方法显式控制,否则您无法判断操作是否会在同一线程上执行。