RxSwift - 如何从一个上游创建两个流
RxSwift - How to create two streams from one upstream
背景
我正在尝试观察一个 Int
流(实际上我不是,但为了使论证更容易)并在将该流与多个其他流组合的同时对其进行一些处理,比如 String
流和一个 Double
流,如下所示:
// RxSwift
let intStream = BehaviorSubject<Int>(value: 0) // subscribe to this later on
let sharedStream = intStream.share()
let mappedStream = sharedStream.map { ... }.share()
let combinedStream1 = Observable.combineLatest(sharedStream, stringStream).map { ... }
let combinedStream2 = Observable.combineLatest(sharedStream, doubleStream).map { ... }
上面的代码只是为了演示我正在尝试做的事情。上面的代码是视图模型代码的一部分(MVVM的VM部分),只有第一个map
(for mappedStream
)运行,其他的没有被调用。
问题
上述方法有什么问题,我该如何实现我想要做的事情?
另外,有没有更好的方法达到同样的效果?
更新
- 我确认将重放计数设置为
1
可以正常工作。但是为什么?
- 上面的代码都进入了视图模型的初始化阶段,订阅发生在之后。
好的,我有一个答案,但它有点复杂...一个问题是您在视图模型中使用了 Subject,但我暂时忽略它。真正的问题来自于您不恰当地使用热可观察对象(share()
使流变热),因此事件被丢弃。
如果您在此代码上放一堆 .debug()
可能会有所帮助,这样您就可以跟进了。但这是本质...
当您订阅 mappedStream
时,它订阅了 share
,后者又订阅了 sharedStream,后者又订阅了 intStream。然后 intStream 发出 0,0 沿着链向下并显示在观察者中。
然后你订阅了combinedStream1
,它订阅了sharedStream的share()。由于此共享已被订阅,订阅将停在那里,并且由于共享已输出它的下一个事件,combinedStream1
不会获得 .next(0) 事件。
与 combinedStream2
相同。
去掉所有的 share()
,一切都会正常:
let intStream = BehaviorSubject<Int>(value: 0) // subscribe to this later on
let mappedStream = intStream.map { [=10=] }
let combinedStream1 = Observable.combineLatest(intStream, stringStream).map { [=10=] }
let combinedStream2 = Observable.combineLatest(intStream, doubleStream).map { [=10=] }
这样,intStream
的每个订阅者都会获得 0
值。
你唯一想分享的时候是你需要分享副作用的时候。这段代码没有任何副作用,所以没有必要分享。
背景
我正在尝试观察一个 Int
流(实际上我不是,但为了使论证更容易)并在将该流与多个其他流组合的同时对其进行一些处理,比如 String
流和一个 Double
流,如下所示:
// RxSwift
let intStream = BehaviorSubject<Int>(value: 0) // subscribe to this later on
let sharedStream = intStream.share()
let mappedStream = sharedStream.map { ... }.share()
let combinedStream1 = Observable.combineLatest(sharedStream, stringStream).map { ... }
let combinedStream2 = Observable.combineLatest(sharedStream, doubleStream).map { ... }
上面的代码只是为了演示我正在尝试做的事情。上面的代码是视图模型代码的一部分(MVVM的VM部分),只有第一个map
(for mappedStream
)运行,其他的没有被调用。
问题
上述方法有什么问题,我该如何实现我想要做的事情? 另外,有没有更好的方法达到同样的效果?
更新
- 我确认将重放计数设置为
1
可以正常工作。但是为什么? - 上面的代码都进入了视图模型的初始化阶段,订阅发生在之后。
好的,我有一个答案,但它有点复杂...一个问题是您在视图模型中使用了 Subject,但我暂时忽略它。真正的问题来自于您不恰当地使用热可观察对象(share()
使流变热),因此事件被丢弃。
如果您在此代码上放一堆 .debug()
可能会有所帮助,这样您就可以跟进了。但这是本质...
当您订阅 mappedStream
时,它订阅了 share
,后者又订阅了 sharedStream,后者又订阅了 intStream。然后 intStream 发出 0,0 沿着链向下并显示在观察者中。
然后你订阅了combinedStream1
,它订阅了sharedStream的share()。由于此共享已被订阅,订阅将停在那里,并且由于共享已输出它的下一个事件,combinedStream1
不会获得 .next(0) 事件。
与 combinedStream2
相同。
去掉所有的 share()
,一切都会正常:
let intStream = BehaviorSubject<Int>(value: 0) // subscribe to this later on
let mappedStream = intStream.map { [=10=] }
let combinedStream1 = Observable.combineLatest(intStream, stringStream).map { [=10=] }
let combinedStream2 = Observable.combineLatest(intStream, doubleStream).map { [=10=] }
这样,intStream
的每个订阅者都会获得 0
值。
你唯一想分享的时候是你需要分享副作用的时候。这段代码没有任何副作用,所以没有必要分享。