Monix:InputStreamObservable 不支持多个订阅者
Monix : InputStreamObservable does not support multiple subscribers
我正在尝试将 (String, Date) 的 Observable 拆分为两个不同的 Observable,并将它们压缩在一起,如下所示
import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).toIterator)
val y = Observable.toReactive(x)
val fileStream = Observable.fromReactivePublisher(y).mapAsync(5)(a => Task{println(a._1); a._1})
val dateStream = Observable.fromReactivePublisher(y).mapAsync(5)(a => Task{println(a._2); a._2})
fileStream.zip(dateStream)
.map(println)
.subscribe()
但我收到以下异常
monix.reactive.exceptions.MultipleSubscribersException: InputStreamObservable does not support multiple subscribers
at monix.reactive.exceptions.MultipleSubscribersException$.build(MultipleSubscribersException.scala:51)
at monix.reactive.internal.builders.IteratorAsObservable.unsafeSubscribeFn(IteratorAsObservable.scala:42)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:155)
at monix.reactive.internal.builders.ReactiveObservable.unsafeSubscribeFn(ReactiveObservable.scala:38)
at monix.reactive.internal.operators.MapAsyncParallelObservable.unsafeSubscribeFn(MapAsyncParallelObservable.scala:60)
at monix.reactive.internal.builders.Zip2Observable.unsafeSubscribeFn(Zip2Observable.scala:158)
at monix.reactive.Observable$$anon.unsafeSubscribeFn(Observable.scala:139)
at monix.reactive.Observable$class.subscribe(Observable.scala:71)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:136)
at monix.reactive.Observable$class.subscribe(Observable.scala:90)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:136)
at monix.reactive.Observable$class.subscribe(Observable.scala:120)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:136)
at monix.reactive.Observable$class.subscribe(Observable.scala:112)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:136)
转换 to/from 反应式是强制性的吗?
修复它的一种方法是 val x = Observable.fromIterable((0 to 10).map(i => (s"a $i", s"b $i")))
,但是对于无限流它会出现 OutOfMemoryError。
另一种方法是使用.multicast(Pipe.publish[])
然后obs.connect()
下代码:
import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).iterator)
val y = Observable.toReactive(x)
val obsY = Observable.fromReactivePublisher(y)
val connectY = obsY.multicast(Pipe.publish[(String, String)])
val fileStream = connectY.mapAsync(5)(a => Task{println(a._1); a._1})
val dateStream = connectY.mapAsync(5)(a => Task{println(a._2); a._2})
fileStream.zip(dateStream)
.map(println)
.subscribe()
connectY.connect()
Thread.sleep(5000)
除了 sergei-shubin 的回答之外,还可以暂时将 Observable
转换为 "hot" 可观察对象,可以使用 publishSelector
将其拆分为多个流,而无需手动处理一个multicast
。这看起来像:
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).toIterator)
val zipped = x.publishSelector { o =>
val fileStream = o.mapParallelUnordered(5)(a => Task{println(a._1); a._1})
val dateStream = o.mapParallelUnordered(5)(a => Task{println(a._2); a._2})
fileStream.zip(dateStream)
}
zipped
.map(println)
.subscribe()
我正在尝试将 (String, Date) 的 Observable 拆分为两个不同的 Observable,并将它们压缩在一起,如下所示
import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).toIterator)
val y = Observable.toReactive(x)
val fileStream = Observable.fromReactivePublisher(y).mapAsync(5)(a => Task{println(a._1); a._1})
val dateStream = Observable.fromReactivePublisher(y).mapAsync(5)(a => Task{println(a._2); a._2})
fileStream.zip(dateStream)
.map(println)
.subscribe()
但我收到以下异常
monix.reactive.exceptions.MultipleSubscribersException: InputStreamObservable does not support multiple subscribers
at monix.reactive.exceptions.MultipleSubscribersException$.build(MultipleSubscribersException.scala:51)
at monix.reactive.internal.builders.IteratorAsObservable.unsafeSubscribeFn(IteratorAsObservable.scala:42)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:155)
at monix.reactive.internal.builders.ReactiveObservable.unsafeSubscribeFn(ReactiveObservable.scala:38)
at monix.reactive.internal.operators.MapAsyncParallelObservable.unsafeSubscribeFn(MapAsyncParallelObservable.scala:60)
at monix.reactive.internal.builders.Zip2Observable.unsafeSubscribeFn(Zip2Observable.scala:158)
at monix.reactive.Observable$$anon.unsafeSubscribeFn(Observable.scala:139)
at monix.reactive.Observable$class.subscribe(Observable.scala:71)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:136)
at monix.reactive.Observable$class.subscribe(Observable.scala:90)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:136)
at monix.reactive.Observable$class.subscribe(Observable.scala:120)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:136)
at monix.reactive.Observable$class.subscribe(Observable.scala:112)
at monix.reactive.Observable$$anon.subscribe(Observable.scala:136)
转换 to/from 反应式是强制性的吗?
修复它的一种方法是 val x = Observable.fromIterable((0 to 10).map(i => (s"a $i", s"b $i")))
,但是对于无限流它会出现 OutOfMemoryError。
另一种方法是使用.multicast(Pipe.publish[])
然后obs.connect()
下代码:
import monix.execution.Scheduler.Implicits.global
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).iterator)
val y = Observable.toReactive(x)
val obsY = Observable.fromReactivePublisher(y)
val connectY = obsY.multicast(Pipe.publish[(String, String)])
val fileStream = connectY.mapAsync(5)(a => Task{println(a._1); a._1})
val dateStream = connectY.mapAsync(5)(a => Task{println(a._2); a._2})
fileStream.zip(dateStream)
.map(println)
.subscribe()
connectY.connect()
Thread.sleep(5000)
除了 sergei-shubin 的回答之外,还可以暂时将 Observable
转换为 "hot" 可观察对象,可以使用 publishSelector
将其拆分为多个流,而无需手动处理一个multicast
。这看起来像:
val x = Observable.fromIterator((0 to 10).map(i => (s"a $i", s"b $i")).toIterator)
val zipped = x.publishSelector { o =>
val fileStream = o.mapParallelUnordered(5)(a => Task{println(a._1); a._1})
val dateStream = o.mapParallelUnordered(5)(a => Task{println(a._2); a._2})
fileStream.zip(dateStream)
}
zipped
.map(println)
.subscribe()