使用 RxJava 将本地数据与远程(或缓存)数据连接起来
Using RxJava to join local data with remote ( or cached ) data
这是工作代码,但我有几个问题以及关于改进它的建议请求。我是 RxJava 的新手,我还没有完全了解如何将这些类型的可观察对象链接在一起。
我有两个模型对象,ListItem
和 UserInfo
。 ListItem
s 存在于本地数据库中,UserInfo
是使用 ListItem
提供的 ID 从服务器获取的。
UserInfo
网络服务接受一个 ID 数组,它将 return 一个 UserInfo
对象的列表。
这段代码的流程如下:
- 从数据库中加载
ListItem
s
- 使用从数据库中获取的
ListItem
s,检查内存缓存以查看我是否已经获取了特定 ListItem
的 UserInfo
- 对于
UserInfo
未缓存的任何项目,从网络中获取它们
- 将获取的
UserInfo
个对象放入缓存
- 重新运行步骤2(方法是
loadCachedUserInfo
)
- Return 结果给订阅者
注意:如果列表被视为 isUserList
.
,则只应为 ListItem
获取 UserInfo
对象
代码如下:
fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
subscriber.onNext(listItems)
subscriber.onCompleted()
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}
return@flatMap Observable.just(listItems)
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false)
}
return@flatMap Observable.just(listItems)
}
}
fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
for ( listItem in listItems ) {
listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()]
}
subscriber.onNext(listItems)
subscriber.onCompleted()
}
}
fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<ATListItem>> {
val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
val records = hashMapOf("records" to ids)
if ( itemsToFetch.count() == 0 ) {
return Observable.just(listItems)
}
return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
.map { json ->
val recordsArray = json.arrayValue("records")
for ( i in 0..recordsArray.length() - 1) {
val coreUserInfo = CoreUserInfo(recordsArray.getJSONObject(i))
coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
coreUserMap[coreUserInfo.userID] = coreUserInfo
coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
}
return@map listItems
}.flatMap { loadCachedUserInfo(listItems, userIDIndex = userIDIndex) }
}
用户将通过调用以下方式启动事件序列:
ListController.itemsInList(list)
我对这段代码的疑问是:
- 目前,
loadCachedUserInfo
接收一个 ListItem
和 return 的数组,在缓存项与其关联后,该数组与可观察对象相同。这对我来说感觉不对。我认为这个调用应该只 return 具有与之关联的缓存 UserInfo
的项目。但是,我需要继续将 ListItem
的完整数组传递给下一个方法
2.) 我需要做额外的工作来支持退订吗?
3.) 这是类似的问题 1。我的 fetchUserInfoForListItems
接受一个列表项数组,并且 returns 是一个可观察对象,在获取并重新获取列表项后具有相同的列表项数组运行 通过缓存方法。这对我来说也是不正确的。对于获取的对象,我宁愿使用此方法 return 和 Observable<List<UserInfo>>
。我不明白如何在 itemsInList
中将 ListItem
与新获取的 UserInfo
和 return 相关联 ListItem
的 Observable。
编辑:写完这篇post后,它帮助我意识到了一些事情。我可以用 flatMap 将我的调用包装在一个 Observable.create 中,它可以包含我想从我的 fetchUserInfoForListItems
中提取的智能,让我解决问题 #3。这是更新后的代码:
fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
subscriber.onNext(listItems)
subscriber.onCompleted()
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}
return@flatMap Observable.just(listItems)
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap Observable.create<List<ATListItem>> { subscriber ->
fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false).map { userInfoList ->
for (coreUserInfo in userInfoList) {
coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
coreUserMap[coreUserInfo.userID] = coreUserInfo
coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
}
}.flatMap {
loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}.subscribe {
subscriber.onNext(listItems)
subscriber.onCompleted()
}
}
}
return@flatMap Observable.just(listItems)
}
}
fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
listItems.forEach { listItem -> listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()] }
subscriber.onNext(listItems)
subscriber.onCompleted()
}
}
fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<CoreUserInfo>> {
val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
val records = hashMapOf("records" to ids)
if ( itemsToFetch.count() == 0 ) { return Observable.just(ArrayList<CoreUserInfo>()) }
return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
.map { json ->
val userInfo = ArrayList<CoreUserInfo>()
json.arrayValue("records").eachObject { userInfo.add(CoreUserInfo(it)) }
return@map userInfo
}
}
- Currently loadCachedUserInfo takes in an array of ListItem and returns that same array as an observable after the cached items have been associated with it. This feels wrong to me. I think instead this call should only return the items that have a cached UserInfo associated with it. However, I need to continue passing the full array of ListItem to the next method
我不确定我是否理解正确,但如果你只需要副作用(缓存),你可以使用 doOnNext
。例如,
.doOnNext { listItems ->
if ( list.isUserList ) {
cache(listItems, userIDIndex = list.userIDIndex!!)
}
}
fun cache(listItems : List<ATListItem>, userIDIndex : Int) {
// caching
}
- Do I need to do additional work to support unsubscribing?
不,据我所知。
注:
有关 doOnNext
的更多信息,请访问 and here
如果 lambda 中的最后一个语句是表达式,通常您不需要 return@...
。
例如:
.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}
return@flatMap Observable.just(listItems)
}
可以这样写:
.flatMap { listItems ->
if ( list.isUserList )
loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
else
Observable.just(listItems)
}
我没有测试代码。
这是工作代码,但我有几个问题以及关于改进它的建议请求。我是 RxJava 的新手,我还没有完全了解如何将这些类型的可观察对象链接在一起。
我有两个模型对象,ListItem
和 UserInfo
。 ListItem
s 存在于本地数据库中,UserInfo
是使用 ListItem
提供的 ID 从服务器获取的。
UserInfo
网络服务接受一个 ID 数组,它将 return 一个 UserInfo
对象的列表。
这段代码的流程如下:
- 从数据库中加载
ListItem
s - 使用从数据库中获取的
ListItem
s,检查内存缓存以查看我是否已经获取了特定ListItem
的 - 对于
UserInfo
未缓存的任何项目,从网络中获取它们 - 将获取的
UserInfo
个对象放入缓存 - 重新运行步骤2(方法是
loadCachedUserInfo
) - Return 结果给订阅者
UserInfo
注意:如果列表被视为 isUserList
.
ListItem
获取 UserInfo
对象
代码如下:
fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
subscriber.onNext(listItems)
subscriber.onCompleted()
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}
return@flatMap Observable.just(listItems)
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false)
}
return@flatMap Observable.just(listItems)
}
}
fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
for ( listItem in listItems ) {
listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()]
}
subscriber.onNext(listItems)
subscriber.onCompleted()
}
}
fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<ATListItem>> {
val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
val records = hashMapOf("records" to ids)
if ( itemsToFetch.count() == 0 ) {
return Observable.just(listItems)
}
return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
.map { json ->
val recordsArray = json.arrayValue("records")
for ( i in 0..recordsArray.length() - 1) {
val coreUserInfo = CoreUserInfo(recordsArray.getJSONObject(i))
coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
coreUserMap[coreUserInfo.userID] = coreUserInfo
coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
}
return@map listItems
}.flatMap { loadCachedUserInfo(listItems, userIDIndex = userIDIndex) }
}
用户将通过调用以下方式启动事件序列:
ListController.itemsInList(list)
我对这段代码的疑问是:
- 目前,
loadCachedUserInfo
接收一个ListItem
和 return 的数组,在缓存项与其关联后,该数组与可观察对象相同。这对我来说感觉不对。我认为这个调用应该只 return 具有与之关联的缓存UserInfo
的项目。但是,我需要继续将ListItem
的完整数组传递给下一个方法
2.) 我需要做额外的工作来支持退订吗?
3.) 这是类似的问题 1。我的 fetchUserInfoForListItems
接受一个列表项数组,并且 returns 是一个可观察对象,在获取并重新获取列表项后具有相同的列表项数组运行 通过缓存方法。这对我来说也是不正确的。对于获取的对象,我宁愿使用此方法 return 和 Observable<List<UserInfo>>
。我不明白如何在 itemsInList
中将 ListItem
与新获取的 UserInfo
和 return 相关联 ListItem
的 Observable。
编辑:写完这篇post后,它帮助我意识到了一些事情。我可以用 flatMap 将我的调用包装在一个 Observable.create 中,它可以包含我想从我的 fetchUserInfoForListItems
中提取的智能,让我解决问题 #3。这是更新后的代码:
fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
subscriber.onNext(listItems)
subscriber.onCompleted()
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}
return@flatMap Observable.just(listItems)
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap Observable.create<List<ATListItem>> { subscriber ->
fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false).map { userInfoList ->
for (coreUserInfo in userInfoList) {
coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
coreUserMap[coreUserInfo.userID] = coreUserInfo
coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
}
}.flatMap {
loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}.subscribe {
subscriber.onNext(listItems)
subscriber.onCompleted()
}
}
}
return@flatMap Observable.just(listItems)
}
}
fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
listItems.forEach { listItem -> listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()] }
subscriber.onNext(listItems)
subscriber.onCompleted()
}
}
fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<CoreUserInfo>> {
val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
val records = hashMapOf("records" to ids)
if ( itemsToFetch.count() == 0 ) { return Observable.just(ArrayList<CoreUserInfo>()) }
return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
.map { json ->
val userInfo = ArrayList<CoreUserInfo>()
json.arrayValue("records").eachObject { userInfo.add(CoreUserInfo(it)) }
return@map userInfo
}
}
- Currently loadCachedUserInfo takes in an array of ListItem and returns that same array as an observable after the cached items have been associated with it. This feels wrong to me. I think instead this call should only return the items that have a cached UserInfo associated with it. However, I need to continue passing the full array of ListItem to the next method
我不确定我是否理解正确,但如果你只需要副作用(缓存),你可以使用 doOnNext
。例如,
.doOnNext { listItems ->
if ( list.isUserList ) {
cache(listItems, userIDIndex = list.userIDIndex!!)
}
}
fun cache(listItems : List<ATListItem>, userIDIndex : Int) {
// caching
}
- Do I need to do additional work to support unsubscribing?
不,据我所知。
注:
有关 doOnNext
的更多信息,请访问
如果 lambda 中的最后一个语句是表达式,通常您不需要 return@...
。
例如:
.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}
return@flatMap Observable.just(listItems)
}
可以这样写:
.flatMap { listItems ->
if ( list.isUserList )
loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
else
Observable.just(listItems)
}
我没有测试代码。