RxSwift - 如何从一个上游创建两个流

RxSwift - How to create two streams from one upstream

背景

我正在尝试观察一个 Int 流(实际上我不是,但为了使论证更容易)并在将该流与多个其他流组合的同时对其进行一些处理,比如 String 流和一个 Double 流,如下所示:

// RxSwift
let intStream = BehaviorSubject<Int>(value: 0) // subscribe to this later on
let sharedStream = intStream.share()
let mappedStream = sharedStream.map { ... }.share()
let combinedStream1 = Observable.combineLatest(sharedStream, stringStream).map { ... }
let combinedStream2 = Observable.combineLatest(sharedStream, doubleStream).map { ... }

上面的代码只是为了演示我正在尝试做的事情。上面的代码是视图模型代码的一部分(MVVM的VM部分),只有第一个map(for mappedStream)运行,其他的没有被调用。

问题

上述方法有什么问题,我该如何实现我想要做的事情? 另外,有没有更好的方法达到同样的效果?

更新

好的,我有一个答案,但它有点复杂...一个问题是您在视图模型中使用了 Subject,但我暂时忽略它。真正的问题来自于您不恰当地使用热可观察对象(share() 使流变热),因此事件被丢弃。

如果您在此代码上放一堆 .debug() 可能会有所帮助,这样您就可以跟进了。但这是本质...

当您订阅 mappedStream 时,它订阅了 share,后者又订阅了 sharedStream,后者又订阅了 intStream。然后 intStream 发出 0,0 沿着链向下并显示在观察者中。

然后你订阅了combinedStream1,它订阅了sharedStream的share()。由于此共享已被订阅,订阅将停在那里,并且由于共享已输出它的下一个事件,combinedStream1 不会获得 .next(0) 事件。

combinedStream2 相同。

去掉所有的 share(),一切都会正常:

let intStream = BehaviorSubject<Int>(value: 0) // subscribe to this later on
let mappedStream = intStream.map { [=10=] }
let combinedStream1 = Observable.combineLatest(intStream, stringStream).map { [=10=] }
let combinedStream2 = Observable.combineLatest(intStream, doubleStream).map { [=10=] }

这样,intStream 的每个订阅者都会获得 0 值。

你唯一想分享的时候是你需要分享副作用的时候。这段代码没有任何副作用,所以没有必要分享。