如何在 scala rx 中调用 onNext

how does onNext is called in scala rx

我正在尝试了解 observable 的工作原理。这是我的代码。

def make: Stream[Int] = {
    Stream.cons(scala.util.Random.nextInt(10), {
      println("Making ..")
      Thread.sleep(1000)
      make
    })
  }

  val y = Observable.from(make)

  y.foreach(a => println(a))

emit 将每 1 秒产生一个新值。我正在用它做一个可观​​察的。 for each 循环将永远打印新产生的值。

据我了解,a=>println(a) 是一个回调值,在 rx observable 中调用 onNext(t) 。

我想弄清楚的是它是如何粘附到生产者的,所以当产生新的价值时,在哪里调用 onNext。我研究了一段时间的 rx 代码,但仍然无法弄清楚。

谢谢。

好像订阅了流调用rx.Observable的

Subscription subscribe(Subscriber<? super T> subscriber) 

这将调用 IterableProducer.request 方法 会有这段代码。

    if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
        // fast-path without backpressure
        while (it.hasNext()) {
            if (o.isUnsubscribed()) {
                return;
            }
            o.onNext(**it.next()**);
        }
        if (!o.isUnsubscribed()) {
            o.onCompleted();
        }
    }

onNext 是观察者代码,它的输入是生产者 it.next。就是这样粘的。