订阅 scheduler.io() 无效

subscribing on scheduler.io() is not working

我的视图模型调用存储库方法从房间数据库和网络中获取一些数据。

     class Repository @Inject constructor(
        private val remoteDatasource: IRemoteSource,
        private val localDatasource: ILocalSource,
        private val subscriberScheduler: Scheduler,
        private val observerScheduler: Scheduler
    ) : IRepository {
    //this method fetches data from room
        override fun getData(): Flowable<Boolean> {
            return localDatasource.shouldFetchRemote().subscribeOn(subscriberScheduler)
           .observeOn(observerScheduler)
        }
// makes api call
override fun getRemoteData(): Flowable<Data> {
            return remoteDatasource.getData().subscribeOn(subscriberScheduler)
           .observeOn(observerScheduler)
        }

subscriberScheduler 是 Schedulers.io(),观察者调度程序是 AndroidSchedulers.mainThread()。 当我从房间查询时出现异常,说该操作在主线程中。 此外,当我从远程源获取数据时,我检查线程,它是主线程,但这也不例外,就像主线程上的网络调用一样。

这是我的本地资源 class,它使用了房间:

class Localsource  constructor(private val dataDao: DataDao):ILocalSource {

    override fun shouldFetchRemote(): Flowable<Boolean> {
        if (Looper.getMainLooper().thread == Thread.currentThread()) {
            Log.v("thread","main thread")
            //this log prints
        }
       //exception thrown here
       return Flowable.just(dataDao.isDataPresent() != 0)
}

这里是 class 用于 RemoteSource

@OpenForTesting
class Remotesource @Inject constructor():IRemoteSource{



    override fun getData(): Flowable<Data> {
        if (Looper.getMainLooper().getThread() == Thread.currentThread()) {
            Log.v("thread","main thread") 
    //this log prints but no exception is thrown like network call on main thread.

        }
        return service.getData().flatMap { Flowable.just(it.data) }
    }
    }

你对发生的事情的假设是错误的。这是一个问题。

让我们看看 shouldFetchRemote() 方法。

//This part will always be on the main thread because it is run on it.
//Schedulers applied only for the created reactive 
//stream(Flowable, Observable, Single etc.) but not for the rest of the code in the method.
        if (Looper.getMainLooper().thread == Thread.currentThread()) {
            Log.v("thread","main thread")
            //this log prints
        }
       //exception thrown here
//Yes it is correct that exception is thrown in this line 
//because you do reach for the database on the main thread here.

//It is because Flowable.just() creates stream out of the independent data 
//that does not know anything about scheduler here.

// dataDao.isDataPresent() - is run on the main thread 
//because it is not yet part of the reactive stream - only its result is!!!! 
//That is crucial
       return Flowable.just(dataDao.isDataPresent() != 0)

为了将函数包含到流中,您需要采用另一种方法。 Room 可以直接 return Flowables 并存储布尔值。这样你就可以这样使用了

在 DAO 中

@Query(...)
Boolean isDataPresent(): Flowable<Boolean>

在您的本地资源中

 override fun shouldFetchRemote(): Flowable<Boolean> = dataDao.isDataPresent()

这样它将按预期工作,因为现在整个功能都是反应流的一部分,并将对调度程序作出反应。

与远程源相同。 Retrofit 可以 return 开箱即用的 Observable 或 Flowable

interface Service{

@GET("data")
fun getData(): Flowable<Data>

}

// and the repo will be

val service = retrofit.create(Service::class.java)

override fun getData(): Flowable<Data> = service.getData()

这样一切都会按预期工作,因为现在它是流的一部分。

如果您想使用来自 Room 或 Retrofit 的计划数据 - 您可以这样做。唯一的问题是 Flowable.just() 不起作用。

例如,对于您的本地资源,您需要执行类似

的操作
//DAO
@Query(...)
Boolean isDataPresent(): Boolean


 override fun shouldFetchRemote(): Flowable<Boolean> = Flowable.create<Boolean>(
                    { emitter ->
                      emitter.onNext(dataDao.isDataPresent())
                      emitter.onComplete() //This is crucial because without onComplete the emitter won't emit anything
//There is also emitter.onError(throwable: Throwable) to handle errors

 }, BackpressureStrategy.LATEST).toObservable() // there are different Backpressure Strategies

Obserwable 和其他反应流有类似的工厂。

通常我会推荐您阅读 documentation