RxJava 订阅许多 observables 不会为所有订阅者触发 onNext()?

RxJava Subscribing to many observables does not trigger onNext() for all subscribers?

当我创建 5 个 observables 并使用单独的订阅者订阅它们时,直觉上我认为每个订阅者都会获得其 observables 的相应数据,通过 onNext() 调用发出:

val compositeSubscription = CompositeDisposable()

fun test() {

        for (i in 0..5) {
            compositeSubscription.add (Observable.create<String>(object : ObservableOnSubscribe<String> {
                override fun subscribe(emitter: ObservableEmitter<String>) {
                    emitter.onNext("somestring")
                    emitter.onComplete()
                }
            }).subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                    .subscribe({
                        Logger.i("testIt onNext")
                    }, {
                        Logger.i("testIt onError")
                    }))
        }
}

不过,我看到的是日志中的一两个"testIt onNext"。

现在,当我在订阅者的 onNext() 中添加延迟时,所有 6 个订阅者的 onNext() 都会被调用。

当一些订阅者的速度不够快,无法赶上他们的数据时,这似乎是一种不正当的条件。这种情况是如何发生的让我避而不谈,因为 subscribe() 应该在 Subscriber 启动并且 运行.

之后被调用

如有任何提示,我们将不胜感激。

从这个代码来看,每个订阅者都应该打印 "testIt onNext"。你确定它没有被打印出来吗?也许 Android Studio 正在折叠相同的行?您是否尝试过为每个订阅者打印不同的内容?