订阅 scheduler.io() 无效
subscribing on scheduler.io() is not working
我的视图模型调用存储库方法从房间数据库和网络中获取一些数据。
class Repository @Inject constructor(
private val remoteDatasource: IRemoteSource,
private val localDatasource: ILocalSource,
private val subscriberScheduler: Scheduler,
private val observerScheduler: Scheduler
) : IRepository {
//this method fetches data from room
override fun getData(): Flowable<Boolean> {
return localDatasource.shouldFetchRemote().subscribeOn(subscriberScheduler)
.observeOn(observerScheduler)
}
// makes api call
override fun getRemoteData(): Flowable<Data> {
return remoteDatasource.getData().subscribeOn(subscriberScheduler)
.observeOn(observerScheduler)
}
subscriberScheduler 是 Schedulers.io(),观察者调度程序是 AndroidSchedulers.mainThread()。
当我从房间查询时出现异常,说该操作在主线程中。
此外,当我从远程源获取数据时,我检查线程,它是主线程,但这也不例外,就像主线程上的网络调用一样。
这是我的本地资源 class,它使用了房间:
class Localsource constructor(private val dataDao: DataDao):ILocalSource {
override fun shouldFetchRemote(): Flowable<Boolean> {
if (Looper.getMainLooper().thread == Thread.currentThread()) {
Log.v("thread","main thread")
//this log prints
}
//exception thrown here
return Flowable.just(dataDao.isDataPresent() != 0)
}
这里是 class 用于 RemoteSource
@OpenForTesting
class Remotesource @Inject constructor():IRemoteSource{
override fun getData(): Flowable<Data> {
if (Looper.getMainLooper().getThread() == Thread.currentThread()) {
Log.v("thread","main thread")
//this log prints but no exception is thrown like network call on main thread.
}
return service.getData().flatMap { Flowable.just(it.data) }
}
}
你对发生的事情的假设是错误的。这是一个问题。
让我们看看 shouldFetchRemote()
方法。
//This part will always be on the main thread because it is run on it.
//Schedulers applied only for the created reactive
//stream(Flowable, Observable, Single etc.) but not for the rest of the code in the method.
if (Looper.getMainLooper().thread == Thread.currentThread()) {
Log.v("thread","main thread")
//this log prints
}
//exception thrown here
//Yes it is correct that exception is thrown in this line
//because you do reach for the database on the main thread here.
//It is because Flowable.just() creates stream out of the independent data
//that does not know anything about scheduler here.
// dataDao.isDataPresent() - is run on the main thread
//because it is not yet part of the reactive stream - only its result is!!!!
//That is crucial
return Flowable.just(dataDao.isDataPresent() != 0)
为了将函数包含到流中,您需要采用另一种方法。 Room 可以直接 return Flowables 并存储布尔值。这样你就可以这样使用了
在 DAO 中
@Query(...)
Boolean isDataPresent(): Flowable<Boolean>
在您的本地资源中
override fun shouldFetchRemote(): Flowable<Boolean> = dataDao.isDataPresent()
这样它将按预期工作,因为现在整个功能都是反应流的一部分,并将对调度程序作出反应。
与远程源相同。 Retrofit
可以 return 开箱即用的 Observable 或 Flowable
interface Service{
@GET("data")
fun getData(): Flowable<Data>
}
// and the repo will be
val service = retrofit.create(Service::class.java)
override fun getData(): Flowable<Data> = service.getData()
这样一切都会按预期工作,因为现在它是流的一部分。
如果您想使用来自 Room 或 Retrofit 的计划数据 - 您可以这样做。唯一的问题是 Flowable.just() 不起作用。
例如,对于您的本地资源,您需要执行类似
的操作
//DAO
@Query(...)
Boolean isDataPresent(): Boolean
override fun shouldFetchRemote(): Flowable<Boolean> = Flowable.create<Boolean>(
{ emitter ->
emitter.onNext(dataDao.isDataPresent())
emitter.onComplete() //This is crucial because without onComplete the emitter won't emit anything
//There is also emitter.onError(throwable: Throwable) to handle errors
}, BackpressureStrategy.LATEST).toObservable() // there are different Backpressure Strategies
Obserwable 和其他反应流有类似的工厂。
通常我会推荐您阅读 documentation。
我的视图模型调用存储库方法从房间数据库和网络中获取一些数据。
class Repository @Inject constructor(
private val remoteDatasource: IRemoteSource,
private val localDatasource: ILocalSource,
private val subscriberScheduler: Scheduler,
private val observerScheduler: Scheduler
) : IRepository {
//this method fetches data from room
override fun getData(): Flowable<Boolean> {
return localDatasource.shouldFetchRemote().subscribeOn(subscriberScheduler)
.observeOn(observerScheduler)
}
// makes api call
override fun getRemoteData(): Flowable<Data> {
return remoteDatasource.getData().subscribeOn(subscriberScheduler)
.observeOn(observerScheduler)
}
subscriberScheduler 是 Schedulers.io(),观察者调度程序是 AndroidSchedulers.mainThread()。 当我从房间查询时出现异常,说该操作在主线程中。 此外,当我从远程源获取数据时,我检查线程,它是主线程,但这也不例外,就像主线程上的网络调用一样。
这是我的本地资源 class,它使用了房间:
class Localsource constructor(private val dataDao: DataDao):ILocalSource {
override fun shouldFetchRemote(): Flowable<Boolean> {
if (Looper.getMainLooper().thread == Thread.currentThread()) {
Log.v("thread","main thread")
//this log prints
}
//exception thrown here
return Flowable.just(dataDao.isDataPresent() != 0)
}
这里是 class 用于 RemoteSource
@OpenForTesting
class Remotesource @Inject constructor():IRemoteSource{
override fun getData(): Flowable<Data> {
if (Looper.getMainLooper().getThread() == Thread.currentThread()) {
Log.v("thread","main thread")
//this log prints but no exception is thrown like network call on main thread.
}
return service.getData().flatMap { Flowable.just(it.data) }
}
}
你对发生的事情的假设是错误的。这是一个问题。
让我们看看 shouldFetchRemote()
方法。
//This part will always be on the main thread because it is run on it.
//Schedulers applied only for the created reactive
//stream(Flowable, Observable, Single etc.) but not for the rest of the code in the method.
if (Looper.getMainLooper().thread == Thread.currentThread()) {
Log.v("thread","main thread")
//this log prints
}
//exception thrown here
//Yes it is correct that exception is thrown in this line
//because you do reach for the database on the main thread here.
//It is because Flowable.just() creates stream out of the independent data
//that does not know anything about scheduler here.
// dataDao.isDataPresent() - is run on the main thread
//because it is not yet part of the reactive stream - only its result is!!!!
//That is crucial
return Flowable.just(dataDao.isDataPresent() != 0)
为了将函数包含到流中,您需要采用另一种方法。 Room 可以直接 return Flowables 并存储布尔值。这样你就可以这样使用了
在 DAO 中
@Query(...)
Boolean isDataPresent(): Flowable<Boolean>
在您的本地资源中
override fun shouldFetchRemote(): Flowable<Boolean> = dataDao.isDataPresent()
这样它将按预期工作,因为现在整个功能都是反应流的一部分,并将对调度程序作出反应。
与远程源相同。 Retrofit
可以 return 开箱即用的 Observable 或 Flowable
interface Service{
@GET("data")
fun getData(): Flowable<Data>
}
// and the repo will be
val service = retrofit.create(Service::class.java)
override fun getData(): Flowable<Data> = service.getData()
这样一切都会按预期工作,因为现在它是流的一部分。
如果您想使用来自 Room 或 Retrofit 的计划数据 - 您可以这样做。唯一的问题是 Flowable.just() 不起作用。
例如,对于您的本地资源,您需要执行类似
的操作//DAO
@Query(...)
Boolean isDataPresent(): Boolean
override fun shouldFetchRemote(): Flowable<Boolean> = Flowable.create<Boolean>(
{ emitter ->
emitter.onNext(dataDao.isDataPresent())
emitter.onComplete() //This is crucial because without onComplete the emitter won't emit anything
//There is also emitter.onError(throwable: Throwable) to handle errors
}, BackpressureStrategy.LATEST).toObservable() // there are different Backpressure Strategies
Obserwable 和其他反应流有类似的工厂。
通常我会推荐您阅读 documentation。