RxJava 订阅许多 observables 不会为所有订阅者触发 onNext()?
RxJava Subscribing to many observables does not trigger onNext() for all subscribers?
当我创建 5 个 observables 并使用单独的订阅者订阅它们时,直觉上我认为每个订阅者都会获得其 observables 的相应数据,通过 onNext() 调用发出:
val compositeSubscription = CompositeDisposable()
fun test() {
for (i in 0..5) {
compositeSubscription.add (Observable.create<String>(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
emitter.onNext("somestring")
emitter.onComplete()
}
}).subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Logger.i("testIt onNext")
}, {
Logger.i("testIt onError")
}))
}
}
不过,我看到的是日志中的一两个"testIt onNext"。
现在,当我在订阅者的 onNext() 中添加延迟时,所有 6 个订阅者的 onNext() 都会被调用。
当一些订阅者的速度不够快,无法赶上他们的数据时,这似乎是一种不正当的条件。这种情况是如何发生的让我避而不谈,因为 subscribe() 应该在 Subscriber 启动并且 运行.
之后被调用
如有任何提示,我们将不胜感激。
从这个代码来看,每个订阅者都应该打印 "testIt onNext"。你确定它没有被打印出来吗?也许 Android Studio 正在折叠相同的行?您是否尝试过为每个订阅者打印不同的内容?
当我创建 5 个 observables 并使用单独的订阅者订阅它们时,直觉上我认为每个订阅者都会获得其 observables 的相应数据,通过 onNext() 调用发出:
val compositeSubscription = CompositeDisposable()
fun test() {
for (i in 0..5) {
compositeSubscription.add (Observable.create<String>(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
emitter.onNext("somestring")
emitter.onComplete()
}
}).subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Logger.i("testIt onNext")
}, {
Logger.i("testIt onError")
}))
}
}
不过,我看到的是日志中的一两个"testIt onNext"。
现在,当我在订阅者的 onNext() 中添加延迟时,所有 6 个订阅者的 onNext() 都会被调用。
当一些订阅者的速度不够快,无法赶上他们的数据时,这似乎是一种不正当的条件。这种情况是如何发生的让我避而不谈,因为 subscribe() 应该在 Subscriber 启动并且 运行.
之后被调用如有任何提示,我们将不胜感激。
从这个代码来看,每个订阅者都应该打印 "testIt onNext"。你确定它没有被打印出来吗?也许 Android Studio 正在折叠相同的行?您是否尝试过为每个订阅者打印不同的内容?