SQLite 不喜欢响应式编程?

SQLite does not like reactive programming?

我已经玩 Netflix 的 RxJava 几个月了。响应式编程改变了我的整个编程方法。它真正带来了最好的函数式编程所提供的。

但是,我想使用 SQLite 的响应式编程。 David Moten 编写了 a great library 以围绕 JDBC 集成 RxJava。但是 SQLite 似乎有问题。它不喜欢一个查询推送 ResultSet,其中每个记录迭代都被转换为一个对象并沿着链驱动另一个查询。

假设我有两个表

CREATE TABLE TABLE_ONE (
    ID    INTEGER PRIMARY KEY
                  NOT NULL,
    VALUE INTEGER NOT NULL
);

CREATE TABLE TABLE_TWO (
    ID         INTEGER NOT NULL
                       PRIMARY KEY,
    FOREIGN_ID INTEGER NOT NULL
                       REFERENCES TABLE_ONE ([KEY]),
    VALUE      INTEGER NOT NULL
); 

我创建了一个 monad 来执行一些 INSERT parent/SELECT parent/INSERT child/SELECT 子类操作。

import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;

import java.sql.Connection;

public final class Test {

    public static void main(String[] args) {

        Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
        Database db = Database.from(con);

        Observable<Integer> inputs = Observable.just(100,200,300);

        db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
                .parameters(inputs)
                .returnGeneratedKeys()
                .getAs(Integer.class)
                .flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
                                .parameter(k)
                                .get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
                ).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
                                .parameter(t1.id)
                                .parameter(t1.value)
                                .returnGeneratedKeys()
                                .getAs(Integer.class)
                ).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
                                .parameter(k)
                                .get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
                ).subscribe(System.out::println, Throwable::printStackTrace);

        db.close();
    }
    private static final class Type1 {
        private final int id;
        private final int value;
        private Type1(int id, int value) {
            this.id = id;
            this.value = value;
        }
    }
    private static final class Type2 {
        private final int id;
        private final int foreignId;
        private final int value;
        private Type2(int id, int foreignKey, int value) {
            this.id = id;
            this.foreignId = foreignKey;
            this.value = value;
        }
        @Override
        public String toString() {
            return "Type2{" +
                    "id=" + id +
                    ", foreignId=" + foreignId +
                    ", value=" + value +
                    '}';
        }
    }
}

更具体地说,这是对所有三个数字 (100、200、300) 发生的过程...

1) INSERT a TABLE_ONE record, get its primary key ID
2) SELECT that TABLE_ONE record with ID
3) Turn it into a Type1 Object
4) INSERT a TABLE_TWO record with Type1's `id for the foreign key (and get primary key ID)
5) SELECT TABLE_TWO record with ID
6) Turn it into a Type2 Object

这一切都是原子发生的,每 100、200、300 个值和 4 updates/queries 发生在这个链中的每个值。

但是,我收到 SQLITE_INTERRUPT 错误

java.sql.SQLException: [SQLITE_INTERRUPT]  Operation terminated by sqlite3_interrupt() (interrupted)
    at org.sqlite.core.DB.newSQLException(DB.java:890)
    at org.sqlite.core.DB.newSQLException(DB.java:901)
    at org.sqlite.core.DB.throwex(DB.java:868)
    at org.sqlite.jdbc3.JDBC3ResultSet.next(JDBC3ResultSet.java:93)
    at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:112)
    at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:75)

但是第一项成功地通过链推送,插入到两个表中,并打印,尽管我还有两个值(200 和 300)要走。

Type2{id=1, foreignId=1, value=100}

我的理论是,当每个发出的项目 O 从一个查询推送到下一个查询时,它会中断并取消前一个查询的迭代,如 X

所示
QUERY OP 4----------------------O-
QUERY OP 3----------------O-----X-
QUERY OP 2------------O---X-------
QUERY OP 1-------O----X-----------

因此,第一个发出的项目通过了,但它在它后面留下了一系列中断的查询来驱动当前的查询,因为查询迭代被终止,所以下一个项目没有办法 onNext()' .

SQLite 和 RxJava 的朋友们,你们能想办法解决这个问题吗?我可以配置 SQLite 设置来停止此中断吗?或者是否有 RxJava 组合技巧可以防止中断?

我还使用上面的测试创建了一个简单的 Git 存储库。 https://github.com/thomasnield/rxjava_sqlite_test

似乎与这样一个事实有关,即使用 SQLite,你不能同时拥有一个突出的游标来表示 SELECT 和 COMMIT,这是默认情况下在 INSERT 和 UPDATE 之后隐含的自动提交模式。这可以防止简单地通过游标遍历 SELECT 产生的行并执行 INSERT。解决方法是简单地获取所有结果,然后循环遍历它们。我不喜欢它。

我不知道这个库,但在查看了它的某些部分后,我确定它使用游标来延迟获取结果,这就是导致问题的原因。您可以通过强制它获取每个 SELECT.

的所有结果来解决此问题

虽然肯定不理想,但Dan的解释似乎证实了我的担心。幸运的是,有一些方法可以解决这个问题。即使像 toList() 这样用于此目的的运算符在某种程度上是 Rx 的反模式,它也比根本不使用 Rx 要好得多。

在这里,我使用 toList().concatMap(Observable::from) 来收集所有排放量,并在每个阶段之间再次拉平它们。

import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;

import java.sql.Connection;

public final class Test {

    public static void main(String[] args) {

        Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
        Database db = Database.from(con);

        Observable<Integer> inputs = Observable.just(100,200,300);

        db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
                .parameters(inputs)
                .returnGeneratedKeys()
                .getAs(Integer.class)
                .toList().concatMap(Observable::from)
                .concatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
                                .parameter(k)
                                .get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
                )
                .toList().concatMap(Observable::from)
                .concatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
                                .parameter(t1.id)
                                .parameter(t1.value)
                                .returnGeneratedKeys()
                                .getAs(Integer.class)
                ).toList().concatMap(Observable::from)
                .concatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
                                .parameter(k)
                                .get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
                ).subscribe(System.out::println, Throwable::printStackTrace);

        db.close();
    }
    private static final class Type1 {
        private final int id;
        private final int value;
        private Type1(int id, int value) {
            this.id = id;
            this.value = value;
        }
    }
    private static final class Type2 {
        private final int id;
        private final int foreignId;
        private final int value;
        private Type2(int id, int foreignKey, int value) {
            this.id = id;
            this.foreignId = foreignKey;
            this.value = value;
        }
        @Override
        public String toString() {
            return "Type2{" +
                    "id=" + id +
                    ", foreignId=" + foreignId +
                    ", value=" + value +
                    '}';
        }
    }
}