从主线程调用的 RxAndroid

RxAndroid called from main thread

我有这个问题 ;)

我试图调用这个用例,最后 returns 一个 Observable。

但是,尽管使用了调度程序,但在主线程上不断被调用。不知道为什么:

看起来像这样:

class MainViewModel @Inject constructor(private val loadNewsUseCase: LoadNews) : Model {

    override fun loadNews() {
        loadNewsUseCase.execute(NewsObserver(), "")
    }

    override fun dispose() {
        loadNewsUseCase.dispose()
    }
}

class NewsObserver : DisposableObserver<Result>() {
    override fun onComplete() {
        Log.i("TAG", "")
    }

    override fun onNext(t: Result) {
        Log.i("TAG", "")

    }

    override fun onError(e: Throwable) {
        Log.i("TAG", "")

    }
}

-

abstract class UseCase<T, in P>(
        private val computationThreadExecutor: ComputationThreadExecutor,
        private val mainThreadExecutor: MainThreadExecutor,
        private val compositeDisposable: CompositeDisposable = CompositeDisposable()
) {

    abstract fun createUseCase(params: P): Observable<T>

    fun execute(disposableObserver: DisposableObserver<T>, params: P) {
        requireNotNull(disposableObserver)

        val observable = createUseCase(params)
                .subscribeOn(Schedulers.newThread())
                .observeOn(mainThreadExecutor.getThread())

        val disposable = observable.subscribeWith(disposableObserver)

        addDisposable(disposable)
    }

    private fun addDisposable(disposable: Disposable) {
        requireNotNull(disposable)
        compositeDisposable.add(disposable)
    }

    fun dispose() {
        !compositeDisposable.isDisposed.apply { compositeDisposable.dispose() }
    }
}

UseCase具体实现使用DataService从api获取数据,如下所示:

open class NewsDataService(private val newsDataProvider: NewsDataProvider) : NewsService {

    override fun loadNews(): Observable<Result> {

         return Observable.just(newsDataProvider.fetchData())

    }
}

NewsDataProvider 内部是正常的同步改造调用。

问题是,从 mainThread() 中调用每个 beginning useCase。不应在新线程中调用?

替换

Observable.just(foo)

类似

Observable.fromCallable(() -> foo)

Observable.just() 从提供的值创建一个可观察对象,您正在主线程上计算该值。 fromCallable() 接受一个可以在您的订阅线程上调用的回调。

fetchData return 设为 Observable<Result>(或可能 Single,但这需要对代码进行更广泛的更新)。 Retrofit 支持 RxJava.