SQLite 不喜欢响应式编程?
SQLite does not like reactive programming?
我已经玩 Netflix 的 RxJava 几个月了。响应式编程改变了我的整个编程方法。它真正带来了最好的函数式编程所提供的。
但是,我想使用 SQLite 的响应式编程。 David Moten 编写了 a great library 以围绕 JDBC 集成 RxJava。但是 SQLite 似乎有问题。它不喜欢一个查询推送 ResultSet
,其中每个记录迭代都被转换为一个对象并沿着链驱动另一个查询。
假设我有两个表
CREATE TABLE TABLE_ONE (
ID INTEGER PRIMARY KEY
NOT NULL,
VALUE INTEGER NOT NULL
);
CREATE TABLE TABLE_TWO (
ID INTEGER NOT NULL
PRIMARY KEY,
FOREIGN_ID INTEGER NOT NULL
REFERENCES TABLE_ONE ([KEY]),
VALUE INTEGER NOT NULL
);
我创建了一个 monad 来执行一些 INSERT parent/SELECT parent/INSERT child/SELECT 子类操作。
import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;
import java.sql.Connection;
public final class Test {
public static void main(String[] args) {
Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
Database db = Database.from(con);
Observable<Integer> inputs = Observable.just(100,200,300);
db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
.parameters(inputs)
.returnGeneratedKeys()
.getAs(Integer.class)
.flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
.parameter(k)
.get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
.parameter(t1.id)
.parameter(t1.value)
.returnGeneratedKeys()
.getAs(Integer.class)
).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
.parameter(k)
.get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
).subscribe(System.out::println, Throwable::printStackTrace);
db.close();
}
private static final class Type1 {
private final int id;
private final int value;
private Type1(int id, int value) {
this.id = id;
this.value = value;
}
}
private static final class Type2 {
private final int id;
private final int foreignId;
private final int value;
private Type2(int id, int foreignKey, int value) {
this.id = id;
this.foreignId = foreignKey;
this.value = value;
}
@Override
public String toString() {
return "Type2{" +
"id=" + id +
", foreignId=" + foreignId +
", value=" + value +
'}';
}
}
}
更具体地说,这是对所有三个数字 (100、200、300) 发生的过程...
1) INSERT a TABLE_ONE record, get its primary key ID
2) SELECT that TABLE_ONE record with ID
3) Turn it into a Type1 Object
4) INSERT a TABLE_TWO record with Type1's `id for the foreign key (and get primary key ID)
5) SELECT TABLE_TWO record with ID
6) Turn it into a Type2 Object
这一切都是原子发生的,每 100、200、300 个值和 4 updates/queries 发生在这个链中的每个值。
但是,我收到 SQLITE_INTERRUPT
错误
java.sql.SQLException: [SQLITE_INTERRUPT] Operation terminated by sqlite3_interrupt() (interrupted)
at org.sqlite.core.DB.newSQLException(DB.java:890)
at org.sqlite.core.DB.newSQLException(DB.java:901)
at org.sqlite.core.DB.throwex(DB.java:868)
at org.sqlite.jdbc3.JDBC3ResultSet.next(JDBC3ResultSet.java:93)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:112)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:75)
但是第一项成功地通过链推送,插入到两个表中,并打印,尽管我还有两个值(200 和 300)要走。
Type2{id=1, foreignId=1, value=100}
我的理论是,当每个发出的项目 O
从一个查询推送到下一个查询时,它会中断并取消前一个查询的迭代,如 X
所示
QUERY OP 4----------------------O-
QUERY OP 3----------------O-----X-
QUERY OP 2------------O---X-------
QUERY OP 1-------O----X-----------
因此,第一个发出的项目通过了,但它在它后面留下了一系列中断的查询来驱动当前的查询,因为查询迭代被终止,所以下一个项目没有办法 onNext()
' .
SQLite 和 RxJava 的朋友们,你们能想办法解决这个问题吗?我可以配置 SQLite 设置来停止此中断吗?或者是否有 RxJava 组合技巧可以防止中断?
我还使用上面的测试创建了一个简单的 Git 存储库。
https://github.com/thomasnield/rxjava_sqlite_test
似乎与这样一个事实有关,即使用 SQLite,你不能同时拥有一个突出的游标来表示 SELECT 和 COMMIT,这是默认情况下在 INSERT 和 UPDATE 之后隐含的自动提交模式。这可以防止简单地通过游标遍历 SELECT 产生的行并执行 INSERT。解决方法是简单地获取所有结果,然后循环遍历它们。我不喜欢它。
我不知道这个库,但在查看了它的某些部分后,我确定它使用游标来延迟获取结果,这就是导致问题的原因。您可以通过强制它获取每个 SELECT.
的所有结果来解决此问题
虽然肯定不理想,但Dan的解释似乎证实了我的担心。幸运的是,有一些方法可以解决这个问题。即使像 toList()
这样用于此目的的运算符在某种程度上是 Rx 的反模式,它也比根本不使用 Rx 要好得多。
在这里,我使用 toList().concatMap(Observable::from)
来收集所有排放量,并在每个阶段之间再次拉平它们。
import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;
import java.sql.Connection;
public final class Test {
public static void main(String[] args) {
Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
Database db = Database.from(con);
Observable<Integer> inputs = Observable.just(100,200,300);
db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
.parameters(inputs)
.returnGeneratedKeys()
.getAs(Integer.class)
.toList().concatMap(Observable::from)
.concatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
.parameter(k)
.get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
)
.toList().concatMap(Observable::from)
.concatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
.parameter(t1.id)
.parameter(t1.value)
.returnGeneratedKeys()
.getAs(Integer.class)
).toList().concatMap(Observable::from)
.concatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
.parameter(k)
.get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
).subscribe(System.out::println, Throwable::printStackTrace);
db.close();
}
private static final class Type1 {
private final int id;
private final int value;
private Type1(int id, int value) {
this.id = id;
this.value = value;
}
}
private static final class Type2 {
private final int id;
private final int foreignId;
private final int value;
private Type2(int id, int foreignKey, int value) {
this.id = id;
this.foreignId = foreignKey;
this.value = value;
}
@Override
public String toString() {
return "Type2{" +
"id=" + id +
", foreignId=" + foreignId +
", value=" + value +
'}';
}
}
}
我已经玩 Netflix 的 RxJava 几个月了。响应式编程改变了我的整个编程方法。它真正带来了最好的函数式编程所提供的。
但是,我想使用 SQLite 的响应式编程。 David Moten 编写了 a great library 以围绕 JDBC 集成 RxJava。但是 SQLite 似乎有问题。它不喜欢一个查询推送 ResultSet
,其中每个记录迭代都被转换为一个对象并沿着链驱动另一个查询。
假设我有两个表
CREATE TABLE TABLE_ONE (
ID INTEGER PRIMARY KEY
NOT NULL,
VALUE INTEGER NOT NULL
);
CREATE TABLE TABLE_TWO (
ID INTEGER NOT NULL
PRIMARY KEY,
FOREIGN_ID INTEGER NOT NULL
REFERENCES TABLE_ONE ([KEY]),
VALUE INTEGER NOT NULL
);
我创建了一个 monad 来执行一些 INSERT parent/SELECT parent/INSERT child/SELECT 子类操作。
import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;
import java.sql.Connection;
public final class Test {
public static void main(String[] args) {
Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
Database db = Database.from(con);
Observable<Integer> inputs = Observable.just(100,200,300);
db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
.parameters(inputs)
.returnGeneratedKeys()
.getAs(Integer.class)
.flatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
.parameter(k)
.get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
).flatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
.parameter(t1.id)
.parameter(t1.value)
.returnGeneratedKeys()
.getAs(Integer.class)
).flatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
.parameter(k)
.get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
).subscribe(System.out::println, Throwable::printStackTrace);
db.close();
}
private static final class Type1 {
private final int id;
private final int value;
private Type1(int id, int value) {
this.id = id;
this.value = value;
}
}
private static final class Type2 {
private final int id;
private final int foreignId;
private final int value;
private Type2(int id, int foreignKey, int value) {
this.id = id;
this.foreignId = foreignKey;
this.value = value;
}
@Override
public String toString() {
return "Type2{" +
"id=" + id +
", foreignId=" + foreignId +
", value=" + value +
'}';
}
}
}
更具体地说,这是对所有三个数字 (100、200、300) 发生的过程...
1) INSERT a TABLE_ONE record, get its primary key ID
2) SELECT that TABLE_ONE record with ID
3) Turn it into a Type1 Object
4) INSERT a TABLE_TWO record with Type1's `id for the foreign key (and get primary key ID)
5) SELECT TABLE_TWO record with ID
6) Turn it into a Type2 Object
这一切都是原子发生的,每 100、200、300 个值和 4 updates/queries 发生在这个链中的每个值。
但是,我收到 SQLITE_INTERRUPT
错误
java.sql.SQLException: [SQLITE_INTERRUPT] Operation terminated by sqlite3_interrupt() (interrupted)
at org.sqlite.core.DB.newSQLException(DB.java:890)
at org.sqlite.core.DB.newSQLException(DB.java:901)
at org.sqlite.core.DB.throwex(DB.java:868)
at org.sqlite.jdbc3.JDBC3ResultSet.next(JDBC3ResultSet.java:93)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.processRow(QuerySelectProducer.java:112)
at com.github.davidmoten.rx.jdbc.QuerySelectProducer.requestSome(QuerySelectProducer.java:75)
但是第一项成功地通过链推送,插入到两个表中,并打印,尽管我还有两个值(200 和 300)要走。
Type2{id=1, foreignId=1, value=100}
我的理论是,当每个发出的项目 O
从一个查询推送到下一个查询时,它会中断并取消前一个查询的迭代,如 X
QUERY OP 4----------------------O-
QUERY OP 3----------------O-----X-
QUERY OP 2------------O---X-------
QUERY OP 1-------O----X-----------
因此,第一个发出的项目通过了,但它在它后面留下了一系列中断的查询来驱动当前的查询,因为查询迭代被终止,所以下一个项目没有办法 onNext()
' .
SQLite 和 RxJava 的朋友们,你们能想办法解决这个问题吗?我可以配置 SQLite 设置来停止此中断吗?或者是否有 RxJava 组合技巧可以防止中断?
我还使用上面的测试创建了一个简单的 Git 存储库。 https://github.com/thomasnield/rxjava_sqlite_test
似乎与这样一个事实有关,即使用 SQLite,你不能同时拥有一个突出的游标来表示 SELECT 和 COMMIT,这是默认情况下在 INSERT 和 UPDATE 之后隐含的自动提交模式。这可以防止简单地通过游标遍历 SELECT 产生的行并执行 INSERT。解决方法是简单地获取所有结果,然后循环遍历它们。我不喜欢它。
我不知道这个库,但在查看了它的某些部分后,我确定它使用游标来延迟获取结果,这就是导致问题的原因。您可以通过强制它获取每个 SELECT.
的所有结果来解决此问题虽然肯定不理想,但Dan的解释似乎证实了我的担心。幸运的是,有一些方法可以解决这个问题。即使像 toList()
这样用于此目的的运算符在某种程度上是 Rx 的反模式,它也比根本不使用 Rx 要好得多。
在这里,我使用 toList().concatMap(Observable::from)
来收集所有排放量,并在每个阶段之间再次拉平它们。
import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable;
import java.sql.Connection;
public final class Test {
public static void main(String[] args) {
Connection con = new ConnectionProviderFromUrl("jdbc:sqlite:C:/Users/Thomas/test.db").get();
Database db = Database.from(con);
Observable<Integer> inputs = Observable.just(100,200,300);
db.update("INSERT INTO TABLE_ONE (VALUE) VALUES (?)")
.parameters(inputs)
.returnGeneratedKeys()
.getAs(Integer.class)
.toList().concatMap(Observable::from)
.concatMap(k -> db.select("SELECT * FROM TABLE_ONE WHERE ID = ?")
.parameter(k)
.get(rs -> new Type1(rs.getInt("ID"), rs.getInt("VALUE")))
)
.toList().concatMap(Observable::from)
.concatMap(t1 -> db.update("INSERT INTO TABLE_TWO (FOREIGN_ID,VALUE) VALUES (?,?)")
.parameter(t1.id)
.parameter(t1.value)
.returnGeneratedKeys()
.getAs(Integer.class)
).toList().concatMap(Observable::from)
.concatMap(k -> db.select("SELECT * FROM TABLE_TWO WHERE ID = ?")
.parameter(k)
.get(rs -> new Type2(rs.getInt("ID"), rs.getInt("FOREIGN_ID"), rs.getInt("VALUE")))
).subscribe(System.out::println, Throwable::printStackTrace);
db.close();
}
private static final class Type1 {
private final int id;
private final int value;
private Type1(int id, int value) {
this.id = id;
this.value = value;
}
}
private static final class Type2 {
private final int id;
private final int foreignId;
private final int value;
private Type2(int id, int foreignKey, int value) {
this.id = id;
this.foreignId = foreignKey;
this.value = value;
}
@Override
public String toString() {
return "Type2{" +
"id=" + id +
", foreignId=" + foreignId +
", value=" + value +
'}';
}
}
}