monix 中 akka-streams 的 alsoTo 模拟是什么?

What is `alsoTo` analogue of akka-streams in monix ?

Monix 看起来是个很棒的框架,但文档非常稀少。

什么是 alsoTo monix 中 akka-streams 的类似物?

基本上我希望流被两个消费者消费。

Monix 遵循 Rx 模型,因为订阅是动态的。任何 Observable 支持无限数量的订阅者:

val obs = Observable.interval(1.second)

val s1 = obs.dump("O1").subscribe()
val s2 = obs.dump("O2").subscribe()

但是有一个问题 — 默认情况下 Observable 是所谓的 "cold data source",这意味着每个订阅者都有自己的数据源。

例如,如果您有一个从 File 读取的 Observable,那么每个订阅者都会获得自己的文件句柄。

为了"share"这样一个Observable在多个订阅者之间,你必须把它转换成热数据源,共享它。您可以使用 multicast 运算符及其版本来执行此操作,最常用的是 publish。这些返回 ConnectableObservable,需要 connect() 调用才能开始流式传输:

val shared = obs.publish

// Nothing happens here:
val s1 = shared.dump("O1").subscribe()
val s2 = shared.dump("O2").subscribe()

// Starts actual streaming
val cancelable = shared.connect()

// You can subscribe after connect(), but you might lose events:
val s3 = shared.dump("O3").subscribe()

// You can unsubscribe one of your subscribers, but the
// data source keeps the stream active for the others
s1.cancel()

// To cancel the connection for all subscribers:
cancelable.cancel()

PS:monix.io 正在进行中,欢迎 PR