SQLite 事务和 RxJava

SQLite transaction and RxJava

我是如何用 RxJava 保存数据的:

override fun put(note: Note): Observable<Note> {
    validateNote(note)
    return Observable.just(note)
            .doOnNext { dbKeeper.startTransaction() }
            .doOnNext { storeHashtags(note) }
            .doOnNext { storeImages(note) }
            .flatMap { notesDataStore.put(notesMapper.transform(note)) }
            .map { notesMapper.transform(it) }
            .doOnNext { dbKeeper.setTransactionSuccessful() }
            .doOnUnsubscribe { dbKeeper.endTransaction() }
}

然后我像这样使用这个方法:

      notesManager.put(note)
            .switchMap { notesManager.getHashtags() }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {view.setHashtags(it) }

并且 doOnUnsubscribe 从未调用 getHashtags() 试图从被 startTransaction() 锁定的数据库 SELECT。死机,呵呵。 好的。让我们用 doOnTerminate(...).

替换 doOnUnsubscribe(...)
override fun put(note: Note): Observable<Note> {
    validateNote(note)
    return Observable.just(note)
            .doOnNext { dbKeeper.startTransaction() }
            .doOnNext { storeHashtags(note) }
            .doOnNext { storeImages(note) }
            .map { notesMapper.transform(note) }
            .flatMap { notesDataStore.put(it) }
            .map { notesMapper.transform(it) }
            .doOnNext { dbKeeper.setTransactionSuccessful() }
            .doOnTerminate { dbKeeper.endTransaction() }
}

但现在如果 Observablesubscriber.unsubscribe() 打断,交易将不会关闭。

你能推荐什么来解决我的情况?

附加信息: 我使用一个 writableDb 实例来 write/read 数据。

我会说这不适合 Rx;我的建议是使用通常的 try-finally 进行交易(请原谅 Java):

Observable<Note> put(Note note) {
   return just(note)
        .doOnNext(n -> {
            validateNote(n);
            dbKeeper.startTransaction();
            try {
              storeHashtags(n);
              storeImages(n);
              notesDataStore.put(notesMapper.transform(n));
              dbKeeper.setTransactionSuccessful();
            } finally {
              dbKeeper.endTransaction();
            }
         });
}

编辑:也许这更有用:

fun <T> withTransaction(sourceObservable: Observable<T>): Observable<T> {
    val latch = AtomicInteger(1)
    val maybeEndTransaction = Action0 {
        if (latch.decrementAndGet() == 0) {
            endTransaction()
        }
    }
    return Observable.empty<T>()
            .doOnCompleted { startTransaction() }
            .mergeWith(sourceObservable)
            .doOnNext { setTransactionSuccessful() }
            .doOnTerminate(maybeEndTransaction)
            .doOnUnsubscribe(maybeEndTransaction)
}

这样使用:

override fun put(note: Note): Observable<Note> {
  return Observable.just(note)
        .doOnNext { validateNote(note) }
        .doOnNext { storeHashtags(note) }
        .doOnNext { storeImages(note) }
        .flatMap { notesDataStore.put(notesMapper.transform(note)) }
        .map { notesMapper.transform(it) }
        .compose { withTransaction }
}

它将确保交易恰好结束一次;请注意不要在原始可观察链中切换线程(除非您的事务管理器可以处理该事务,或者您修改了调度程序以将新线程与现有事务相关联)。