monix 中 akka-streams 的 alsoTo 模拟是什么?
What is `alsoTo` analogue of akka-streams in monix ?
Monix 看起来是个很棒的框架,但文档非常稀少。
什么是 alsoTo
monix 中 akka-streams 的类似物?
基本上我希望流被两个消费者消费。
Monix 遵循 Rx 模型,因为订阅是动态的。任何 Observable
支持无限数量的订阅者:
val obs = Observable.interval(1.second)
val s1 = obs.dump("O1").subscribe()
val s2 = obs.dump("O2").subscribe()
但是有一个问题 — 默认情况下 Observable
是所谓的 "cold data source",这意味着每个订阅者都有自己的数据源。
例如,如果您有一个从 File
读取的 Observable
,那么每个订阅者都会获得自己的文件句柄。
为了"share"这样一个Observable
在多个订阅者之间,你必须把它转换成热数据源,共享它。您可以使用 multicast
运算符及其版本来执行此操作,最常用的是 publish
。这些返回 ConnectableObservable
,需要 connect()
调用才能开始流式传输:
val shared = obs.publish
// Nothing happens here:
val s1 = shared.dump("O1").subscribe()
val s2 = shared.dump("O2").subscribe()
// Starts actual streaming
val cancelable = shared.connect()
// You can subscribe after connect(), but you might lose events:
val s3 = shared.dump("O3").subscribe()
// You can unsubscribe one of your subscribers, but the
// data source keeps the stream active for the others
s1.cancel()
// To cancel the connection for all subscribers:
cancelable.cancel()
PS:monix.io 正在进行中,欢迎 PR
Monix 看起来是个很棒的框架,但文档非常稀少。
什么是 alsoTo
monix 中 akka-streams 的类似物?
基本上我希望流被两个消费者消费。
Monix 遵循 Rx 模型,因为订阅是动态的。任何 Observable
支持无限数量的订阅者:
val obs = Observable.interval(1.second)
val s1 = obs.dump("O1").subscribe()
val s2 = obs.dump("O2").subscribe()
但是有一个问题 — 默认情况下 Observable
是所谓的 "cold data source",这意味着每个订阅者都有自己的数据源。
例如,如果您有一个从 File
读取的 Observable
,那么每个订阅者都会获得自己的文件句柄。
为了"share"这样一个Observable
在多个订阅者之间,你必须把它转换成热数据源,共享它。您可以使用 multicast
运算符及其版本来执行此操作,最常用的是 publish
。这些返回 ConnectableObservable
,需要 connect()
调用才能开始流式传输:
val shared = obs.publish
// Nothing happens here:
val s1 = shared.dump("O1").subscribe()
val s2 = shared.dump("O2").subscribe()
// Starts actual streaming
val cancelable = shared.connect()
// You can subscribe after connect(), but you might lose events:
val s3 = shared.dump("O3").subscribe()
// You can unsubscribe one of your subscribers, but the
// data source keeps the stream active for the others
s1.cancel()
// To cancel the connection for all subscribers:
cancelable.cancel()
PS:monix.io 正在进行中,欢迎 PR