与多个订阅者一起调用流的一部分?
call part of stream once with multiple subscribers?
我有用于套接字通信的自定义 Rx 适配器。
在它之外,我观察带有消息的 Flowable。
然后我有一些管理器来处理每条消息,然后进一步发出它。
fun observeSocket() = socketManager
.observe()
.doOnNext{
insideMessageHandler.handle(it)
}
然后我有两个订阅者执行 observeSocket().subscribe()
问题是每条消息 insideMessageHandler.handle(it) 都会被调用两次。我想找到每个订阅者都可以使用部分流的方式。不幸的是,observeSocket() 末尾的 .share() 运算符不起作用。
我遇到过两次这样的事情:
/onNextInside
Flowable/-onNextOutsideSubscriber1
Flowable\-onNextOutsideSubscriber2
\-onNextInside
我想要这样的东西:
/-onNextInside
Flowable/-onNextOutsideSubscriber1
\-onNextOutsideSubscriber2
在代码中看起来像
insideManager.observeSocket().subscribe({do something first})
insideManager.observeSocket().subscribe({do something second})
问题是在这种情况下我调用了两次 onNextInside
有可能吗?
对于这种情况,您有主题。
var yourSubject = PublishSubject<Type>.create()
fun observeSocket() = socketManager.doOnNext(yourSubject::onNext)
然后您可以在您的 observeSocket() 和 another/multiple 订阅者上观察该主题。
如果你想共享一个 Observables 的发射,你可以使用 share() 运算符,它确保即使订阅了多个订阅者,它也只创建一个实例并使用
共享发射的数据
fun observeSocket() = socketManager
.doOnNext(insideMessageHandler::handle)
.share()
无论如何,为了这个目的,将 flat/emit 到 doOnNext 中的另一个可观察对象的反应性东西不是一个好的用例。
最好共享一个 Flowable 并多次订阅,然后根据需要平面映射值,而不是将所有内容都放在一个 flowable 中,因为如果流中出现错误,整个流可能不会再发出任何数据。
问题是重新创建可观察对象:
fun observeSocket() = socketManager
.observe()
.doOnNext{
insideMessageHandler.handle(it)
}
每次调用 observeSocket()
都会创建一个新链,因此将 share()
放在那里不会有什么不同。
改为将此链定义为共享单例:
private val _observeSocket = socketManager
.observe()
.doOnNext{
insideMessageHandler.handle(it)
}
.share()
fun observeSocket() = _observeSocket
我有用于套接字通信的自定义 Rx 适配器。 在它之外,我观察带有消息的 Flowable。 然后我有一些管理器来处理每条消息,然后进一步发出它。
fun observeSocket() = socketManager
.observe()
.doOnNext{
insideMessageHandler.handle(it)
}
然后我有两个订阅者执行 observeSocket().subscribe()
问题是每条消息 insideMessageHandler.handle(it) 都会被调用两次。我想找到每个订阅者都可以使用部分流的方式。不幸的是,observeSocket() 末尾的 .share() 运算符不起作用。
我遇到过两次这样的事情:
/onNextInside
Flowable/-onNextOutsideSubscriber1
Flowable\-onNextOutsideSubscriber2
\-onNextInside
我想要这样的东西:
/-onNextInside
Flowable/-onNextOutsideSubscriber1
\-onNextOutsideSubscriber2
在代码中看起来像
insideManager.observeSocket().subscribe({do something first})
insideManager.observeSocket().subscribe({do something second})
问题是在这种情况下我调用了两次 onNextInside
有可能吗?
对于这种情况,您有主题。
var yourSubject = PublishSubject<Type>.create()
fun observeSocket() = socketManager.doOnNext(yourSubject::onNext)
然后您可以在您的 observeSocket() 和 another/multiple 订阅者上观察该主题。
如果你想共享一个 Observables 的发射,你可以使用 share() 运算符,它确保即使订阅了多个订阅者,它也只创建一个实例并使用
共享发射的数据fun observeSocket() = socketManager
.doOnNext(insideMessageHandler::handle)
.share()
无论如何,为了这个目的,将 flat/emit 到 doOnNext 中的另一个可观察对象的反应性东西不是一个好的用例。
最好共享一个 Flowable 并多次订阅,然后根据需要平面映射值,而不是将所有内容都放在一个 flowable 中,因为如果流中出现错误,整个流可能不会再发出任何数据。
问题是重新创建可观察对象:
fun observeSocket() = socketManager
.observe()
.doOnNext{
insideMessageHandler.handle(it)
}
每次调用 observeSocket()
都会创建一个新链,因此将 share()
放在那里不会有什么不同。
改为将此链定义为共享单例:
private val _observeSocket = socketManager
.observe()
.doOnNext{
insideMessageHandler.handle(it)
}
.share()
fun observeSocket() = _observeSocket