RxJava 2 和在 IO 线程上使用 Realm

RxJava 2 and working with Realm on the IO thread

我正在 DataManager 中创建一个方法,它首先从缓存中下载数据,然后从服务器请求数据 API,保存结果并将给定的网络数据发送给演示者(在 MVP 中)。

问题是在 UI 线程上使用 Realm,而我想在后台线程上使用它。我找到了一些关于第一个 RxJava 领域支持的文章,但我们使用的是第二个版本,它有另一个 API,所以这些领域方法对我们没有帮助 (toObservable())。

如何解决这个问题?

此外,正如我所见,所有其他方法都在 IO 线程上处理,只有 Realm 在 Ui 上工作,而不管我放置 subscribeOn(Schedulers.io()) 的事实。为什么会这样?

@Override
public Observable<ChatsRepoAnswerModel> getChats() {
    return getChatsFromCache(STATUS_OK)
            .subscribeOn(Schedulers.io())
            .mergeWith(
                    getChatsService()
                            .getChats()
                            .subscribeOn(Schedulers.io())
                            .map(ChatResponseModel::getResult)
                            .flatMap(mChatsMapper::transformAll)
                            .doOnNext(this::saveChats)
                            .doOnNext(Collections::sort)                            
                            .onErrorResumeNext(getChatsFromCache(STATUS_ERROR))                               
            .observeOn(AndroidSchedulers.mainThread());
}


private void saveChats(List<ChatDataModel> realmObjects) {        
    Realm.getDefaultInstance().executeTransaction(realm -> {
        realm.insertOrUpdate(realmObjects);
    });
}

private Observable<ChatsRepoAnswerModel> getChatsFromCache(int aStatus) {
    Realm realm = Realm.getDefaultInstance();
    RealmResults<ChatDataModel> chats = realm.where(ChatDataModel.class).findAll();
    return processChatResponse(realm.copyFromRealm(chats), aStatus);
}

虽然我认为这完全无视 Realm 试图为您提供的 zero-copy 设计:

  • unidirectional data-flow 来自 auto-updatingreactive 数据集由 Realm 以 RealmResults(意味着当数据集发生变化时您会收到通知)
  • lazy-evaluation,只读取一次访问的元素,RealmResults只是一个"cursor",而RealmObject s 仅在调用访问器时读取数据
  • 一致性:所有托管的 RealmProxies 都指向同一个对象,因此您在任何地方都没有 "out of date" 数据(好吧,除了 non-autoupdating 背景保留的线程,但这通常是用户错误)

这很有趣,因为 realm.copyFromRealm() 创建的非托管对象通常不具有这些属性中的任何一个:

  • 不再auto-updates
  • 急切地评估整个数据集并将所有数据复制到字段
  • 因此,不一定在所有使用的地方都是最新的

无论如何,在后台线程上创建分离的 RealmObject 的解决方案是在该线程上打开实例,复制数据集,然后关闭实例。

Observable.fromCallable(() -> { // <-- defer to whatever thread you're running it on
        try(Realm realm = Realm.getDefaultInstance()) {
            return realm.copyFromRealm(realm.where(Cat.class).findAll());
        } // <-- auto-close
    })
.subscribeOn(Schedulers.whatever());

不过,通常按预期使用 Realm 会更容易,尤其是对于较大的数据集。只有在后台循环线程 (HandlerThread) 上通过 运行 保留 auto-updating Realm 时,复制才真正有意义。