使用 RXJava 2 异步读/写 Realm

Asynchronous read / write to Realm using RXJava 2

我第一次使用 RXJava 2

实现异步操作

目标:

使用库 Retrofit2 从服务器获取 json 数据。如果成功,则将数据写入Realm,并在记录后立即取回数据并发送给RecyclerView的适配器。

于是,我就这样意识到了这一切:

private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) {

    String accessToken = accessDataModel.getAccessToken();

    MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableSubscriber<ChatsModel>() {
                @Override
                public void onNext(ChatsModel chatsModel) {
                    if (chatsRepository.hasData()) {

                        chatsRepository.updateChatsData(chatsModel)
                                .subscribe(new DisposableObserver<ChatsModel>() {
                                    @Override
                                    public void onNext(ChatsModel localChatsModel) {
                                        Log.d(TAG, "DO, onSuccess updated!");
                                        iGetChatsCallback.onGetChatsSuccess(localChatsModel);
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        Log.d(TAG, "DO, onError when update!");
                                        iGetChatsCallback.onGetChatsError(e.getMessage());
                                    }

                                    @Override
                                    public void onComplete() {
                                        dispose();
                                        Log.d(TAG, "DO, onComplete!");
                                    }
                                });

                    } else {
                        chatsRepository.insertChatsData(chatsModel)
                                .subscribe(new DisposableObserver<ChatsModel>() {
                                    @Override
                                    public void onNext(ChatsModel localChatsModel) {
                                        iGetChatsCallback.onGetChatsSuccess(localChatsModel);
                                        Log.d(TAG, "DO, onSuccess inserted!");
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        iGetChatsCallback.onGetChatsError(e.getMessage());
                                        Log.d(TAG, "DO, onError when inserting!");
                                    }

                                    @Override
                                    public void onComplete() {
                                        dispose();
                                        Log.d(TAG, "DO, onComplete!");
                                    }
                                });
                    }
                }

                @Override
                public void onError(Throwable t) {
                    Log.d(TAG, "onError" + t.getMessage());
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

我把Realm中的数据写在订阅者的onNext()方法中MyApplication.getRestApi().GetChats()

这是入口代码:

public Observable<ChatsModel> updateChatsData(final ChatsModel chatsModel) {

    return Observable.create(new ObservableOnSubscribe<ChatsModel>() {
        @Override
        public void subscribe(ObservableEmitter<ChatsModel> e) throws Exception {
            if (chatsModel != null) {
                realm.executeTransactionAsync(
                        realm -> realm.copyToRealmOrUpdate(chatsModel),
                        () -> {
                            Log.d(LOG_TAG, "Data success updated!");
                            ChatsModel localChatsModel = getAllChatsData();
                            e.onNext(localChatsModel);
                            e.onComplete();
                        },
                        error -> {
                            Log.d(LOG_TAG, "Update data failed!");
                            e.onError(error);
                        });
            }

        }
    });

}

updateChatsData() 异步写入并在另一个 class 中声明。

如你所见,fetchChatsFromNetwork()方法写起来很繁琐,我觉得是这样

问题:

不管我做的对不对,如果不对,怎么会是对的?

private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) {    
    String accessToken = accessDataModel.getAccessToken();
    Single<ChatsModel> chats = MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version);
    chats.doOnNext((chats) -> {
        chatsRepository.insertOrUpdate(chats);
    }).subscribeOn(Schedulers.io())
    .subscribe();
}

public void updateChatsData(final ChatsModel chatsModel) {
    try(Realm realm = Realm.getDefaultInstance()) {
        realm.executeTransaction(r -> {
            r.insertOrUpdate(chatsModel);
        });
    }
}

public Flowable<List<ChatsModel>> getAllChatsData(Realm realm) {
    RealmQuery<ChatsModel> query = realm.where(ChatsModel.class);
    if(realm.isAutoRefresh()) {
        return query.findAllAsync().asFlowable().filter(RealmResults::isLoaded);
    } else {
        return Flowable.just(query.findAll());
    }
}