领域 - 响应式扩展 RxJava2

Realm - reactive extension RxJava2

我将 Realm 与 RxJava2 结合使用。 Realm 的实例必须由程序员关闭还是自动关闭?

这个链条有问题:

 return Observable.merge(
      getEvents
        .toObservable()
      ,
      mChangeEventsNotification
        .flatMapMaybe(notification ->
          getEvents
          .firstElement()
        )
    )

Realm 抛出 java.lang.IllegalStateException: 此 Realm 实例已关闭,无法使用。

getEvents 的实现与 getList() 相同。

我观看了 RealmObservableFactory 的实现。

operator firstElement 是否可以关闭 merge operator 的第一个 Observable 的 Realm Instance?

Realm 实例在一个线程内的所有 Observer 之间共享?

看起来是Realm 4.3.3版本的bug。当我降级到版本 4.1.0 时,一切都很好。可能是引用计数的问题。

为了回答您的问题,我将询问 Rx 集成的源代码:

public Flowable<RealmResults<E>> asFlowable() {
    if (realm instanceof Realm) {
        return realm.configuration.getRxFactory().from((Realm) realm, this);

默认情况下 RxObservableFactoryRealmObservableFactory,它是:

@Override
public <E> Flowable<RealmResults<E>> from(final Realm realm, final RealmResults<E> results) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<RealmResults<E>>() {
        @Override
        public void subscribe(final FlowableEmitter<RealmResults<E>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            resultsRefs.get().acquireReference(results);
            final RealmChangeListener<RealmResults<E>> listener = new RealmChangeListener<RealmResults<E>>() {
                @Override
                public void onChange(RealmResults<E> results) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(results);
                    }
                }
            };
            results.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    results.removeChangeListener(listener);
                    observableRealm.close();
                    resultsRefs.get().releaseReference(results);
                }
            }));

            // Emit current value immediately
            emitter.onNext(results);

        }
    }, BACK_PRESSURE_STRATEGY);
}

在这种情况下,您调用 asFlowable() 的 RealmResults 的 Realm 实例用于获取其 RealmConfiguration,并且在创建 Flowable 的线程上增加引用计数(这是UI 线程或处理程序线程)。

这意味着即使是以下情况也可以:

public Flowable<List<MyObject>> getList() {
    try(Realm realm = Realm.getDefaultInstance()) {
        return realm.where(MyObject.class)
                    .findAllAsync()
                    .asFlowable()
                    .filter(RealmResults::isLoaded);
    } // auto-close
}

只要您不退订 Flowable,与此 Flowable 关联的 Realm 就会一直保持打开状态。


要回答您的问题,是的,您确实需要关闭从某处获取 RealmResults 的 Realm 实例。