SQLite 事务和 RxJava
SQLite transaction and RxJava
我是如何用 RxJava 保存数据的:
override fun put(note: Note): Observable<Note> {
validateNote(note)
return Observable.just(note)
.doOnNext { dbKeeper.startTransaction() }
.doOnNext { storeHashtags(note) }
.doOnNext { storeImages(note) }
.flatMap { notesDataStore.put(notesMapper.transform(note)) }
.map { notesMapper.transform(it) }
.doOnNext { dbKeeper.setTransactionSuccessful() }
.doOnUnsubscribe { dbKeeper.endTransaction() }
}
然后我像这样使用这个方法:
notesManager.put(note)
.switchMap { notesManager.getHashtags() }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {view.setHashtags(it) }
并且 doOnUnsubscribe
从未调用 getHashtags()
试图从被 startTransaction()
锁定的数据库 SELECT。死机,呵呵。
好的。让我们用 doOnTerminate(...)
.
替换 doOnUnsubscribe(...)
override fun put(note: Note): Observable<Note> {
validateNote(note)
return Observable.just(note)
.doOnNext { dbKeeper.startTransaction() }
.doOnNext { storeHashtags(note) }
.doOnNext { storeImages(note) }
.map { notesMapper.transform(note) }
.flatMap { notesDataStore.put(it) }
.map { notesMapper.transform(it) }
.doOnNext { dbKeeper.setTransactionSuccessful() }
.doOnTerminate { dbKeeper.endTransaction() }
}
但现在如果 Observable
被 subscriber.unsubscribe()
打断,交易将不会关闭。
你能推荐什么来解决我的情况?
附加信息:
我使用一个 writableDb 实例来 write/read 数据。
我会说这不适合 Rx;我的建议是使用通常的 try-finally 进行交易(请原谅 Java):
Observable<Note> put(Note note) {
return just(note)
.doOnNext(n -> {
validateNote(n);
dbKeeper.startTransaction();
try {
storeHashtags(n);
storeImages(n);
notesDataStore.put(notesMapper.transform(n));
dbKeeper.setTransactionSuccessful();
} finally {
dbKeeper.endTransaction();
}
});
}
编辑:也许这更有用:
fun <T> withTransaction(sourceObservable: Observable<T>): Observable<T> {
val latch = AtomicInteger(1)
val maybeEndTransaction = Action0 {
if (latch.decrementAndGet() == 0) {
endTransaction()
}
}
return Observable.empty<T>()
.doOnCompleted { startTransaction() }
.mergeWith(sourceObservable)
.doOnNext { setTransactionSuccessful() }
.doOnTerminate(maybeEndTransaction)
.doOnUnsubscribe(maybeEndTransaction)
}
这样使用:
override fun put(note: Note): Observable<Note> {
return Observable.just(note)
.doOnNext { validateNote(note) }
.doOnNext { storeHashtags(note) }
.doOnNext { storeImages(note) }
.flatMap { notesDataStore.put(notesMapper.transform(note)) }
.map { notesMapper.transform(it) }
.compose { withTransaction }
}
它将确保交易恰好结束一次;请注意不要在原始可观察链中切换线程(除非您的事务管理器可以处理该事务,或者您修改了调度程序以将新线程与现有事务相关联)。
我是如何用 RxJava 保存数据的:
override fun put(note: Note): Observable<Note> {
validateNote(note)
return Observable.just(note)
.doOnNext { dbKeeper.startTransaction() }
.doOnNext { storeHashtags(note) }
.doOnNext { storeImages(note) }
.flatMap { notesDataStore.put(notesMapper.transform(note)) }
.map { notesMapper.transform(it) }
.doOnNext { dbKeeper.setTransactionSuccessful() }
.doOnUnsubscribe { dbKeeper.endTransaction() }
}
然后我像这样使用这个方法:
notesManager.put(note)
.switchMap { notesManager.getHashtags() }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {view.setHashtags(it) }
并且 doOnUnsubscribe
从未调用 getHashtags()
试图从被 startTransaction()
锁定的数据库 SELECT。死机,呵呵。
好的。让我们用 doOnTerminate(...)
.
doOnUnsubscribe(...)
override fun put(note: Note): Observable<Note> {
validateNote(note)
return Observable.just(note)
.doOnNext { dbKeeper.startTransaction() }
.doOnNext { storeHashtags(note) }
.doOnNext { storeImages(note) }
.map { notesMapper.transform(note) }
.flatMap { notesDataStore.put(it) }
.map { notesMapper.transform(it) }
.doOnNext { dbKeeper.setTransactionSuccessful() }
.doOnTerminate { dbKeeper.endTransaction() }
}
但现在如果 Observable
被 subscriber.unsubscribe()
打断,交易将不会关闭。
你能推荐什么来解决我的情况?
附加信息: 我使用一个 writableDb 实例来 write/read 数据。
我会说这不适合 Rx;我的建议是使用通常的 try-finally 进行交易(请原谅 Java):
Observable<Note> put(Note note) {
return just(note)
.doOnNext(n -> {
validateNote(n);
dbKeeper.startTransaction();
try {
storeHashtags(n);
storeImages(n);
notesDataStore.put(notesMapper.transform(n));
dbKeeper.setTransactionSuccessful();
} finally {
dbKeeper.endTransaction();
}
});
}
编辑:也许这更有用:
fun <T> withTransaction(sourceObservable: Observable<T>): Observable<T> {
val latch = AtomicInteger(1)
val maybeEndTransaction = Action0 {
if (latch.decrementAndGet() == 0) {
endTransaction()
}
}
return Observable.empty<T>()
.doOnCompleted { startTransaction() }
.mergeWith(sourceObservable)
.doOnNext { setTransactionSuccessful() }
.doOnTerminate(maybeEndTransaction)
.doOnUnsubscribe(maybeEndTransaction)
}
这样使用:
override fun put(note: Note): Observable<Note> {
return Observable.just(note)
.doOnNext { validateNote(note) }
.doOnNext { storeHashtags(note) }
.doOnNext { storeImages(note) }
.flatMap { notesDataStore.put(notesMapper.transform(note)) }
.map { notesMapper.transform(it) }
.compose { withTransaction }
}
它将确保交易恰好结束一次;请注意不要在原始可观察链中切换线程(除非您的事务管理器可以处理该事务,或者您修改了调度程序以将新线程与现有事务相关联)。