领域 - 响应式扩展 RxJava2
Realm - reactive extension RxJava2
我将 Realm 与 RxJava2 结合使用。 Realm 的实例必须由程序员关闭还是自动关闭?
这个链条有问题:
return Observable.merge(
getEvents
.toObservable()
,
mChangeEventsNotification
.flatMapMaybe(notification ->
getEvents
.firstElement()
)
)
Realm 抛出 java.lang.IllegalStateException: 此 Realm 实例已关闭,无法使用。
getEvents 的实现与 getList() 相同。
我观看了 RealmObservableFactory 的实现。
operator firstElement 是否可以关闭 merge operator 的第一个 Observable 的 Realm Instance?
Realm 实例在一个线程内的所有 Observer 之间共享?
看起来是Realm 4.3.3版本的bug。当我降级到版本 4.1.0 时,一切都很好。可能是引用计数的问题。
为了回答您的问题,我将询问 Rx 集成的源代码:
public Flowable<RealmResults<E>> asFlowable() {
if (realm instanceof Realm) {
return realm.configuration.getRxFactory().from((Realm) realm, this);
默认情况下 RxObservableFactory
是 RealmObservableFactory
,它是:
@Override
public <E> Flowable<RealmResults<E>> from(final Realm realm, final RealmResults<E> results) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<RealmResults<E>>() {
@Override
public void subscribe(final FlowableEmitter<RealmResults<E>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
resultsRefs.get().acquireReference(results);
final RealmChangeListener<RealmResults<E>> listener = new RealmChangeListener<RealmResults<E>>() {
@Override
public void onChange(RealmResults<E> results) {
if (!emitter.isCancelled()) {
emitter.onNext(results);
}
}
};
results.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
results.removeChangeListener(listener);
observableRealm.close();
resultsRefs.get().releaseReference(results);
}
}));
// Emit current value immediately
emitter.onNext(results);
}
}, BACK_PRESSURE_STRATEGY);
}
在这种情况下,您调用 asFlowable()
的 RealmResults 的 Realm 实例用于获取其 RealmConfiguration,并且在创建 Flowable 的线程上增加引用计数(这是UI 线程或处理程序线程)。
这意味着即使是以下情况也可以:
public Flowable<List<MyObject>> getList() {
try(Realm realm = Realm.getDefaultInstance()) {
return realm.where(MyObject.class)
.findAllAsync()
.asFlowable()
.filter(RealmResults::isLoaded);
} // auto-close
}
只要您不退订 Flowable,与此 Flowable 关联的 Realm 就会一直保持打开状态。
要回答您的问题,是的,您确实需要关闭从某处获取 RealmResults 的 Realm 实例。
我将 Realm 与 RxJava2 结合使用。 Realm 的实例必须由程序员关闭还是自动关闭?
这个链条有问题:
return Observable.merge(
getEvents
.toObservable()
,
mChangeEventsNotification
.flatMapMaybe(notification ->
getEvents
.firstElement()
)
)
Realm 抛出 java.lang.IllegalStateException: 此 Realm 实例已关闭,无法使用。
getEvents 的实现与 getList() 相同。
我观看了 RealmObservableFactory 的实现。
operator firstElement 是否可以关闭 merge operator 的第一个 Observable 的 Realm Instance?
Realm 实例在一个线程内的所有 Observer 之间共享?
看起来是Realm 4.3.3版本的bug。当我降级到版本 4.1.0 时,一切都很好。可能是引用计数的问题。
为了回答您的问题,我将询问 Rx 集成的源代码:
public Flowable<RealmResults<E>> asFlowable() {
if (realm instanceof Realm) {
return realm.configuration.getRxFactory().from((Realm) realm, this);
默认情况下 RxObservableFactory
是 RealmObservableFactory
,它是:
@Override
public <E> Flowable<RealmResults<E>> from(final Realm realm, final RealmResults<E> results) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Flowable.create(new FlowableOnSubscribe<RealmResults<E>>() {
@Override
public void subscribe(final FlowableEmitter<RealmResults<E>> emitter) throws Exception {
// Gets instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final Realm observableRealm = Realm.getInstance(realmConfig);
resultsRefs.get().acquireReference(results);
final RealmChangeListener<RealmResults<E>> listener = new RealmChangeListener<RealmResults<E>>() {
@Override
public void onChange(RealmResults<E> results) {
if (!emitter.isCancelled()) {
emitter.onNext(results);
}
}
};
results.addChangeListener(listener);
// Cleanup when stream is disposed
emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
@Override
public void run() {
results.removeChangeListener(listener);
observableRealm.close();
resultsRefs.get().releaseReference(results);
}
}));
// Emit current value immediately
emitter.onNext(results);
}
}, BACK_PRESSURE_STRATEGY);
}
在这种情况下,您调用 asFlowable()
的 RealmResults 的 Realm 实例用于获取其 RealmConfiguration,并且在创建 Flowable 的线程上增加引用计数(这是UI 线程或处理程序线程)。
这意味着即使是以下情况也可以:
public Flowable<List<MyObject>> getList() {
try(Realm realm = Realm.getDefaultInstance()) {
return realm.where(MyObject.class)
.findAllAsync()
.asFlowable()
.filter(RealmResults::isLoaded);
} // auto-close
}
只要您不退订 Flowable,与此 Flowable 关联的 Realm 就会一直保持打开状态。
要回答您的问题,是的,您确实需要关闭从某处获取 RealmResults 的 Realm 实例。