如何使用 RxJava 从 Realm 获取带有对象列表的 Observable?

How to get Observable with List of objects from Realm using RxJava?

我只有这个:

public Observable<List<Movie>> getAll() {
    return Observable.just(Movie.class)
            .flatMap(t -> Observable.just(t)
                    .doOnSubscribe(disposable -> realm.executeTransaction(realm1 -> realm1.where(Movie.class).findAll()))
                    .onErrorResumeNext((ObservableSource<? extends Class<Movie>>) observer -> Observable.empty())
                    .map(all -> realm.where(Movie.class).findAll())
            );
}

但是真的很难看))

一切都会好起来的,如果有可能避免重复代码并保存 realm.where(Movie.class).findAll() 并在 map() 中重用。 RealmResults 方法 addAll 已弃用。

如前所述here

private io.reactivex.Flowable<List<Movie>> getAll() {
    return io.reactivex.Flowable.create(new FlowableOnSubscribe<List<Movie>>() {
        @Override
        public void subscribe(FlowableEmitter<List<Movie>> emitter)
                throws Exception {
            Realm realm = Realm.getDefaultInstance();
            RealmResults<Movie> results = realm.where(Movie.class).findAllAsync();
            final RealmChangeListener<RealmResults<Movie>> listener = _realm -> {
                if(!emitter.isUnsubscribed() && results.isLoaded()) {
                     emitter.onNext(results);
                }
            };
            emitter.setDisposable(Disposables.fromRunnable(() -> {
                results.removeChangeListener(listener);
                realm.close();
            }));
            results.addChangeListener(listener);
        }
    }, BackpressureStrategy.LATEST)
    .subscribeOn(AndroidSchedulers.mainThread())
    .unsubscribeOn(AndroidSchedulers.mainThread());

但正如@masp 在评论中所说,您可以在 my article about this on realm.io that was published a month ago.

中阅读更多有关使用 Realm 和 RxJava2 设计反应式数据层的信息

因此,对于 Realm 4.0.0-RC1 及更高版本,您实际上可以做到

private io.reactivex.Flowable<List<Movie>> getAll(Realm realm) {
    if(realm.isAutoRefresh()) {
        return realm.where(Movie.class)
                .findAllAsync()
                .asFlowable()
                .filter(RealmResults::isLoaded);
    } else { // for background threads
        return Flowable.just(realm.where(Movie.class).findAll());
    }
}

您可以随时将查找电影的代码提取到另一个方法中以供重复使用。即使在功能性 API 中,我们也应该避免 break DRY.[​​=11=]

public void code() {
    public Observable<List<Movie>> getAll () {
        return Observable.just(Movie.class)
                .flatMap(t -> Observable.just(t)
                        .doOnSubscribe(disposable -> realm.executeTransaction(realm1 -> findMovie((Object) realm1)))
                        .onErrorResumeNext((ObservableSource<? extends Class<Movie>>) observer -> Observable.empty())
                        .map(all -> findMovie((Object) all))
                );
    }

}

private Object findMovie(Object realm1) {
    return realm1.where(Movie.class).findAll();
}