从数据库+网络加载数据(Room + Retrofit + RxJava2)

Loading data from Database + Network (Room + Retrofit + RxJava2)

我有一个示例 API 请求,其中 return 是用户关注列表的列表。我想在用户加载监视列表屏幕时实现以下 流程 :

  1. 立即从数据库缓存中加载数据。(cacheWatchList)

  2. 在后台发起RetroFit网络调用。

    我。 onSuccess return apiWatchList
    二. onError return cacheWatchList

  3. 差异 cacheWatchListapiWatchList

    我。相同 -> 一切正常,因为数据已经显示给用户,什么也不做。

    二。差异 -> 将 apiWatchList 保存到本地存储并将 apiWatchList 发送到下游。

到目前为止我做了什么?

Watchlist.kt

data class Watchlist(
  val items: List<Repository> = emptyList()
)

LocalStore.kt(Android房间)

  fun saveUserWatchlist(repositories: List<Repository>): Completable {    
    return Completable.fromCallable {      
      watchlistDao.saveAllUserWatchlist(*repositories.toTypedArray())
    }
  }

RemoteStore.kt(改造api调用)

  fun getWatchlist(userId: UUID): Single<Watchlist?> {
    return api.getWatchlist(userId)
  }

DataManager.kt

  fun getWatchlist(userId: UUID): Flowable<List<Repository>?> {
    val localSource: Single<List<Repository>?> =
      localStore.getUserWatchlist()
        .subscribeOn(scheduler.computation)

    val remoteSource: Single<List<Repository>> = remoteStore.getWatchlist(userId)
      .map(Watchlist::items)
      .doOnSuccess { items: List<Repository> ->
        localStore.saveUserWatchlist(items)
          .subscribeOn(scheduler.io)
          .subscribe()
      }
      .onErrorResumeNext { throwable ->
        if (throwable is IOException) {
          return@onErrorResumeNext localStore.getUserWatchlist()
        }
        return@onErrorResumeNext Single.error(throwable)
      }
      .subscribeOn(scheduler.io)

    return Single.concat(localSource, remoteSource)
  }

上述流程的问题是,它为下游(演示者)的每个流源调用 onNext 两次 ,即使两个数据相同。

我可以在演示器中执行数据差异逻辑并进行相应更新,但我希望 DataManager class 为我处理逻辑(CleanArchitecture,SOC)。

我的问题?

  1. 实现上述逻辑的最佳方法是什么?

  2. 我是否泄露了 DataManager 中的内部订阅(参见:doOnSuccess 代码)?当演示者被销毁时,我正在处理外部订阅。

fun getWatchlist(userId: UUID): Observable<List<Repository>?> {
val remoteSource: Single<List<Repository>> = 
remoteStore.getWatchlist(userId)
        .map(Watchlist::items)
        .subscribeOn(scheduler.io)

return localStore.getUserWatchlist()
        .flatMapObservable { listFromLocal: List<Repository> ->
            remoteSource
                    .observeOn(scheduler.computation)
                    .toObservable()
                    .filter { apiWatchList: List<Repository> ->
                        apiWatchList != listFromLocal
                    }
                    .flatMapSingle { apiWatchList ->
                        localSource.saveUserWatchlist(apiWatchList)
                                .andThen(Single.just(apiWatchList))
                    }
                    .startWith(listFromLocal)
        }
}

分步说明:

  1. 从 localStore 加载数据
  2. 每次 localStore 发出数据时,使用 flatMapObservable 订阅 remoteSource。
  3. 由于内部 Observable 有不止一个发射(来自本地的初始数据和来自远程源的更新数据的新数据)将 Single 转换为 Observable。
  4. 将来自 remoteSource 的数据与来自 localStore 的数据进行比较,仅在 newData != localData 的情况下才处理数据。
  5. 对于过滤器启动 localSource 后的每个发射来保存数据,并在完成此操作后继续将数据保存为 Single。
  6. 根据要求,在远程请求开始时应该处理来自 localStore 的数据,只需在运算符链的末尾添加 startWith 即可。