为什么 Monix Observable 多生产一个元素

Why Monix Observable produces one element more than needed

我正在玩 Monix 流并获得了我从 Iterator 构建 Observable 的示例。在我看来,当 运行 它产生的元素比我预期的多 1 个。以下代码表明:

  val count = AtomicLong(0)
  def produceValue(): Long = {
    count.transformAndGet { i =>
      logger.info(s"Producing value: ${i + 1}")
      i + 1
    }
  }
  def more(): Boolean = count.get < 20

  lazy val iter = new Iterator[Long] {
    override def hasNext: Boolean = more()
    override def next(): Long     = produceValue()
  }    

  Observable
    .fromIterator(iter)
    .mapParallelUnordered(5) { x =>
      Task(x)
        .foreachL { x =>
          logger.info(s"Transforming $x")
        }
        .delayExecution(3.seconds)
    }
    .consumeWith(Consumer.complete)
    .runAsync

这个案子很简单。有 Iterator 每次产生 next 值时打印日志。下游阶段是简单的延迟任务 运行,并行计数为 5,以查看发生了什么。现在输出如下:

[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] -  Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] -  Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] -  Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] -  Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] -  Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] -  Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] -  Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] -  Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] -  Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] -  Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] -  Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] -  Transforming 17

如您所见,最初流产生 6 个元素,而我预计只有 5 个(因为下游阶段 mapParallelUnordered 只需要 5 个元素。实际上这没什么大不了的,但我只是想明白为什么会这样。

另外,为什么初始值在 main 线程中生成,而后续值在 execution-context 线程池中调用?不应该都使用用于 运行 整个流的调度程序吗?

As you can see, initially the stream produces 6 elements

低级通信协议是围绕 Subscriber 及其(继承的)方法 onNext 设计的,具有以下签名:

def onNext(elem: A): Future[Ack]

(source)

如果我们将创建和转换都想象成一个阶段,则源可观察(fromIterator 在您的情况下)将其价值推向 订阅者,并且,当得到承认时, 推下一个。

那么会发生什么:

  • fromIterator阶段生成值1
  • 1 值被推送到 mapAsyncUnordered 阶段,在那里它被接受(b/c 有空闲工人),所以确认是立即 Continue
  • 对值 2-5 重复上述步骤
  • fromIterator 阶段生成值 6(这是当您看到输出时)
  • 6 值被推到 mapAsyncUnordered 阶段。这一次,它不能立即被接受,所以确认是在 Continue 一段时间后。在此之前,fromIterator.
  • 不再生成任何值

需要注意的是,不是mapAsyncUnordered阶段从fromIterator中拉取值,而是fromIterator自己生成这些值,它无法提前知道下游转换是否会立即接受该值。


Shouldn't all be using scheduler that is used to run entire stream?

Monix Observable 出于性能原因尝试尽可能同步工作(切换线程很昂贵)。通常,除非通过 executeAsyncexecuteOn 等方法显式控制,否则您无法判断操作是否会在同一线程上执行。