并行处理两个 Observable 并转换为单个 Observable
Parallel processing of two Observables and convert to a single Observable
我有两个从单个 Observable 创建的 Observable,如下所示
import monix.reactive.Observable
import scala.collection.immutable
val a: immutable.Seq[(String, String)] = (0 to 10).toList.map(x =>(s"left-$x", s"right-$x"))
val originalStream: Observable[(String, String)] = Observable.fromIterable(a)
val leftStream: Observable[String] = originalStream.map(_._1).map(println)
val rightStream: Observable[String] = originalStream.map(_._2).map(println)
现在我如何运行 并行地使用 leftStream 和 rightStream 并将它们组合在一起以获得一个我可以订阅的新 Observable?执行 Observable.merge 是按顺序执行它们。
执行 leftStream.zip(rightStream)
并且应该为两个流提供背压。
但是使用 map 从 originalStream 创建 leftStream 和 rightStream 意味着 originalStream 中的元素将转到这些流之一,而不是两个。
我有两个从单个 Observable 创建的 Observable,如下所示
import monix.reactive.Observable
import scala.collection.immutable
val a: immutable.Seq[(String, String)] = (0 to 10).toList.map(x =>(s"left-$x", s"right-$x"))
val originalStream: Observable[(String, String)] = Observable.fromIterable(a)
val leftStream: Observable[String] = originalStream.map(_._1).map(println)
val rightStream: Observable[String] = originalStream.map(_._2).map(println)
现在我如何运行 并行地使用 leftStream 和 rightStream 并将它们组合在一起以获得一个我可以订阅的新 Observable?执行 Observable.merge 是按顺序执行它们。
执行 leftStream.zip(rightStream)
并且应该为两个流提供背压。
但是使用 map 从 originalStream 创建 leftStream 和 rightStream 意味着 originalStream 中的元素将转到这些流之一,而不是两个。