与多个订阅者一起调用流的一部分?

call part of stream once with multiple subscribers?

我有用于套接字通信的自定义 Rx 适配器。 在它之外,我观察带有消息的 Flowable。 然后我有一些管理器来处理每条消息,然后进一步发出它。

       fun observeSocket() = socketManager
                .observe()
                .doOnNext{
                    insideMessageHandler.handle(it)
                }

然后我有两个订阅者执行 observeSocket().subscribe()

问题是每条消息 insideMessageHandler.handle(it) 都会被调用两次。我想找到每个订阅者都可以使用部分流的方式。不幸的是,observeSocket() 末尾的 .share() 运算符不起作用。

我遇到过两次这样的事情:

         /onNextInside
Flowable/-onNextOutsideSubscriber1
Flowable\-onNextOutsideSubscriber2
         \-onNextInside

我想要这样的东西:

         /-onNextInside
Flowable/-onNextOutsideSubscriber1
        \-onNextOutsideSubscriber2

在代码中看起来像

insideManager.observeSocket().subscribe({do something first})
insideManager.observeSocket().subscribe({do something second})

问题是在这种情况下我调用了两次 onNextInside

有可能吗?

对于这种情况,您有主题。

var yourSubject = PublishSubject<Type>.create()
fun observeSocket() = socketManager.doOnNext(yourSubject::onNext)

然后您可以在您的 observeSocket() 和 another/multiple 订阅者上观察该主题。

如果你想共享一个 Observables 的发射,你可以使用 share() 运算符,它确保即使订阅了多个订阅者,它也只创建一个实例并使用

共享发射的数据
fun observeSocket() = socketManager
            .doOnNext(insideMessageHandler::handle)
            .share()

无论如何,为了这个目的,将 flat/emit 到 doOnNext 中的另一个可观察对象的反应性东西不是一个好的用例。

最好共享一个 Flowable 并多次订阅,然后根据需要平面映射值,而不是将所有内容都放在一个 flowable 中,因为如果流中出现错误,整个流可能不会再发出任何数据。

问题是重新创建可观察对象:

fun observeSocket() = socketManager
        .observe()
        .doOnNext{
            insideMessageHandler.handle(it)
        }

每次调用 observeSocket() 都会创建一个新链,因此将 share() 放在那里不会有什么不同。

改为将此链定义为共享单例:

private val _observeSocket = socketManager
        .observe()
        .doOnNext{
            insideMessageHandler.handle(it)
        }
        .share()


fun observeSocket() = _observeSocket