使用 RxJava 同时从本地和远程获取数据
Fetching data from local and remote simultaneously using RxJava
所以我是 RxJava 的初学者,但这是我想要完成的:
MainViewModel 与存储库对话。 Repository 有 LocalDataStore(与数据库对话)和 RemoteDataStore(Retrofit)两者都是接口 DataStore 的不同实现。
我想要实现的是从 returns 一个 Observable 的存储库调用 fetchData 但是:
- 它首先从 RemoteDataStore 中获取它
- 在获取每一个东西 (onNext()) 之后,它将它插入到数据库中
- 如果失败,它 returns 来自 LocalDataStore。
但是,我不知道如何实现这个逻辑。订阅发生在 ViewModel 的一端,但我无法真正将可观察到的从存储库端(?)更改为 LocalDataStore。将数据更新到数据库中也是 returns 一个 Observable(准确地说是 Single),并且它需要订阅才能工作。
谁能给我解释一下或者给我指明一个好的方向?
我的代码(存储库注释中的问题):
远程数据存储
override fun getData(): Observable<SomeData> = api
.getData(token)
.flatMapIterable { x -> x }
本地数据存储
override fun saveData(data: SomeData): Single<SomeData> {
return database.upsert(data)
}
存储库
fun getData(): Observable<SomeData> {
return
remoteDataStore.getData()
.doOnError {
localDataStore.getData() //? this will invoke but nothing happens because I'm not subscribed to it
}
.doOnNext {
saveData(it) //The same as before, nothing will happen
}
}
视图模型
override fun fetchData() {
repository.getData()
.observeOn(androidScheduler)
.subscribeOn(threadScheduler)
.subscribe(
{ data: SomeData ->
dataList.add(data)
},
{ throwable: Throwable? ->
handleError(throwable)
},
{
//send data to view
},
{ disposable: Disposable ->
compositeDisposable.add(disposable)
}
)
}
感谢您的宝贵时间。
onErrorResumeNext
就是您要找的。 doOnError
调用副作用操作,不会用另一个替换原来的 Observable
。
您需要使用 onErrorResumeNext 方法之一。我还建议将您的流类型从 Observable
更改为 Single
,因为您的数据性质似乎是 Get data once or throw error。这只是一个很好的 API 设计。
在您的特定情况下,我将以这种方式实现存储库:
class RepositoryImpl @Inject constructor(private val localRepository: Repository, private val remoteRepository: Repository) : Repository {
override fun getData(): Single<Data> = remoteRepository.getData()
.onErrorResumeNext { throwable ->
if (throwable is IOException) {
return localRepository.getData()
}
return Single.error(throwable)
}
}
你可能会问为什么只捕获IOException
?我通常只处理这个异常,以免错过任何重要但不重要的网络错误。如果您将捕获您可能错过的每个异常,例如 NullPointerException
.
所以我是 RxJava 的初学者,但这是我想要完成的:
MainViewModel 与存储库对话。 Repository 有 LocalDataStore(与数据库对话)和 RemoteDataStore(Retrofit)两者都是接口 DataStore 的不同实现。
我想要实现的是从 returns 一个 Observable 的存储库调用 fetchData 但是:
- 它首先从 RemoteDataStore 中获取它
- 在获取每一个东西 (onNext()) 之后,它将它插入到数据库中
- 如果失败,它 returns 来自 LocalDataStore。
但是,我不知道如何实现这个逻辑。订阅发生在 ViewModel 的一端,但我无法真正将可观察到的从存储库端(?)更改为 LocalDataStore。将数据更新到数据库中也是 returns 一个 Observable(准确地说是 Single),并且它需要订阅才能工作。
谁能给我解释一下或者给我指明一个好的方向?
我的代码(存储库注释中的问题):
远程数据存储
override fun getData(): Observable<SomeData> = api
.getData(token)
.flatMapIterable { x -> x }
本地数据存储
override fun saveData(data: SomeData): Single<SomeData> {
return database.upsert(data)
}
存储库
fun getData(): Observable<SomeData> {
return
remoteDataStore.getData()
.doOnError {
localDataStore.getData() //? this will invoke but nothing happens because I'm not subscribed to it
}
.doOnNext {
saveData(it) //The same as before, nothing will happen
}
}
视图模型
override fun fetchData() {
repository.getData()
.observeOn(androidScheduler)
.subscribeOn(threadScheduler)
.subscribe(
{ data: SomeData ->
dataList.add(data)
},
{ throwable: Throwable? ->
handleError(throwable)
},
{
//send data to view
},
{ disposable: Disposable ->
compositeDisposable.add(disposable)
}
)
}
感谢您的宝贵时间。
onErrorResumeNext
就是您要找的。 doOnError
调用副作用操作,不会用另一个替换原来的 Observable
。
您需要使用 onErrorResumeNext 方法之一。我还建议将您的流类型从 Observable
更改为 Single
,因为您的数据性质似乎是 Get data once or throw error。这只是一个很好的 API 设计。
在您的特定情况下,我将以这种方式实现存储库:
class RepositoryImpl @Inject constructor(private val localRepository: Repository, private val remoteRepository: Repository) : Repository {
override fun getData(): Single<Data> = remoteRepository.getData()
.onErrorResumeNext { throwable ->
if (throwable is IOException) {
return localRepository.getData()
}
return Single.error(throwable)
}
}
你可能会问为什么只捕获IOException
?我通常只处理这个异常,以免错过任何重要但不重要的网络错误。如果您将捕获您可能错过的每个异常,例如 NullPointerException
.