使用 Monifu 的 Scala Rx Observable
Scala Rx Observable using Monifu
我只是想掌握冷热可观察对象之间的概念,并试用 Monifu 库。我的理解是以下代码应该导致只有一个订阅者获得 Observable 发出的事件,但事实并非如此!
scala> :paste
// Entering paste mode (ctrl-D to finish)
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
val obs = Observable.interval(1.second).take(10)
val x = obs.foreach(a => println(s"from x ${a}"))
val y = obs.foreach(a => println(s"from y ${a}"))
// Exiting paste mode, now interpreting.
from x 0
from y 0
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
obs: monifu.reactive.Observable[Long] = monifu.reactive.Observable$$anon@2c3c615d
x: Unit = ()
y: Unit = ()
scala> from x 1
from y 1
from x 2
from y 2
from x 3
from y 3
from x 4
from y 4
from x 5
from y 5
from x 6
from y 6
from x 7
from y 7
from x 8
from y 8
from x 9
from y 9
那么,这对我来说似乎是 Observable 正在向所有感兴趣的订阅者发布事件?
我是 Monifu 的主要作者。
一个 cold observable 意味着它的订阅函数为每个订阅者启动一个新的数据源(在每个 subscribe()
调用上),而 hot observable 正在多个订阅者之间共享同一个数据源。
例如,将文件视为数据源。让我们对一个简单的 Observable 进行建模,它从文件中发出行:
def fromFile(file: File): Observable[String] = {
// this is the subscribe function that
// we are passing to create ;-)
Observable.create { subscriber =>
// executing things on our thread-pool
subscriber.scheduler.execute {
val source = try {
Observable.fromIterable(scala.io.Source
.fromFile(file).getLines().toIterable)
}
catch {
// subscribe functions must be protected
case NonFatal(ex) =>
Observable.error(ex)
}
source.unsafeSubscribe(subscriber)
}
}
}
此函数创建一个冷可观察对象。这意味着它将为每个订阅的观察者打开一个新的文件句柄,然后为每个订阅的观察者读取并发出行。
但是我们可以把它变成一个热的可观察对象:
// NOTE: publish() turns a cold observable into a hot one
val hotObservable = fromFile(file).publish()
然后不同之处在于你这样做的时候:
val x = observable.subscribe()
val y = observable.subscribe()
如果 Observable 很热:
- 在您调用
connect()
之前,observable 不会执行任何操作
- 在
connect()
之后,同一个文件被打开,两者将收到完全相同的事件
- 发出该文件的所有行后,新订阅者将什么也得不到,因为(共享)数据源已经耗尽
如果 Observable 是冷的:
- 在每次订阅时,都会打开并读取一个新的文件句柄
- 元素在
subscribe()
之后立即发出,因此无需等待 connect()
- 所有订阅的观察者都将收到该文件中的所有行,无论他们何时订阅
一些也适用于 Monifu 的参考资料:
我只是想掌握冷热可观察对象之间的概念,并试用 Monifu 库。我的理解是以下代码应该导致只有一个订阅者获得 Observable 发出的事件,但事实并非如此!
scala> :paste
// Entering paste mode (ctrl-D to finish)
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
val obs = Observable.interval(1.second).take(10)
val x = obs.foreach(a => println(s"from x ${a}"))
val y = obs.foreach(a => println(s"from y ${a}"))
// Exiting paste mode, now interpreting.
from x 0
from y 0
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
obs: monifu.reactive.Observable[Long] = monifu.reactive.Observable$$anon@2c3c615d
x: Unit = ()
y: Unit = ()
scala> from x 1
from y 1
from x 2
from y 2
from x 3
from y 3
from x 4
from y 4
from x 5
from y 5
from x 6
from y 6
from x 7
from y 7
from x 8
from y 8
from x 9
from y 9
那么,这对我来说似乎是 Observable 正在向所有感兴趣的订阅者发布事件?
我是 Monifu 的主要作者。
一个 cold observable 意味着它的订阅函数为每个订阅者启动一个新的数据源(在每个 subscribe()
调用上),而 hot observable 正在多个订阅者之间共享同一个数据源。
例如,将文件视为数据源。让我们对一个简单的 Observable 进行建模,它从文件中发出行:
def fromFile(file: File): Observable[String] = {
// this is the subscribe function that
// we are passing to create ;-)
Observable.create { subscriber =>
// executing things on our thread-pool
subscriber.scheduler.execute {
val source = try {
Observable.fromIterable(scala.io.Source
.fromFile(file).getLines().toIterable)
}
catch {
// subscribe functions must be protected
case NonFatal(ex) =>
Observable.error(ex)
}
source.unsafeSubscribe(subscriber)
}
}
}
此函数创建一个冷可观察对象。这意味着它将为每个订阅的观察者打开一个新的文件句柄,然后为每个订阅的观察者读取并发出行。
但是我们可以把它变成一个热的可观察对象:
// NOTE: publish() turns a cold observable into a hot one
val hotObservable = fromFile(file).publish()
然后不同之处在于你这样做的时候:
val x = observable.subscribe()
val y = observable.subscribe()
如果 Observable 很热:
- 在您调用
connect()
之前,observable 不会执行任何操作 - 在
connect()
之后,同一个文件被打开,两者将收到完全相同的事件 - 发出该文件的所有行后,新订阅者将什么也得不到,因为(共享)数据源已经耗尽
如果 Observable 是冷的:
- 在每次订阅时,都会打开并读取一个新的文件句柄
- 元素在
subscribe()
之后立即发出,因此无需等待connect()
- 所有订阅的观察者都将收到该文件中的所有行,无论他们何时订阅
一些也适用于 Monifu 的参考资料: