如何在 scala rx 中调用 onNext
how does onNext is called in scala rx
我正在尝试了解 observable 的工作原理。这是我的代码。
def make: Stream[Int] = {
Stream.cons(scala.util.Random.nextInt(10), {
println("Making ..")
Thread.sleep(1000)
make
})
}
val y = Observable.from(make)
y.foreach(a => println(a))
emit 将每 1 秒产生一个新值。我正在用它做一个可观察的。 for each 循环将永远打印新产生的值。
据我了解,a=>println(a) 是一个回调值,在 rx observable 中调用 onNext(t) 。
我想弄清楚的是它是如何粘附到生产者的,所以当产生新的价值时,在哪里调用 onNext。我研究了一段时间的 rx 代码,但仍然无法弄清楚。
谢谢。
好像订阅了流调用rx.Observable的
Subscription subscribe(Subscriber<? super T> subscriber)
这将调用 IterableProducer.request 方法
会有这段代码。
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
// fast-path without backpressure
while (it.hasNext()) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(**it.next()**);
}
if (!o.isUnsubscribed()) {
o.onCompleted();
}
}
onNext 是观察者代码,它的输入是生产者 it.next。就是这样粘的。
我正在尝试了解 observable 的工作原理。这是我的代码。
def make: Stream[Int] = {
Stream.cons(scala.util.Random.nextInt(10), {
println("Making ..")
Thread.sleep(1000)
make
})
}
val y = Observable.from(make)
y.foreach(a => println(a))
emit 将每 1 秒产生一个新值。我正在用它做一个可观察的。 for each 循环将永远打印新产生的值。
据我了解,a=>println(a) 是一个回调值,在 rx observable 中调用 onNext(t) 。
我想弄清楚的是它是如何粘附到生产者的,所以当产生新的价值时,在哪里调用 onNext。我研究了一段时间的 rx 代码,但仍然无法弄清楚。
谢谢。
好像订阅了流调用rx.Observable的
Subscription subscribe(Subscriber<? super T> subscriber)
这将调用 IterableProducer.request 方法 会有这段代码。
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
// fast-path without backpressure
while (it.hasNext()) {
if (o.isUnsubscribed()) {
return;
}
o.onNext(**it.next()**);
}
if (!o.isUnsubscribed()) {
o.onCompleted();
}
}
onNext 是观察者代码,它的输入是生产者 it.next。就是这样粘的。