这两个并发事务之间的数据如何泄漏?
How can data be leaking between these 2 concurrent transactions?
我在 java 8 上有一个项目 运行,spring boot 1.5.9,hsqldb 2.4.0,jdbi3-core 3.0.0。我正在编写测试来验证数据库行为。一组测试检查并发行为。
我最基本的测试是在 2 个事务之间以某种方式泄漏数据,我不知道为什么。不知道是我测试有问题,是jdbi有bug,还是hqsldb根本坏了。
连接 url jdbc:hsqldb:${...};create=true;hsqldb.tx=mvlocks;hsqldb.tx_level=serializable
注意 tx=mvlocks 和 tx_level=serializable。设置 tx=mvcc 以便 tx_level 成为快照隔离不会改变结果,这更奇怪,因为 快照 在任何更改应该 肯定不能互相影响
测试说明:
- 创建一个 table 有 1 条记录
- 启动 2 个线程
- 在每个交易中,同时开始一个新交易(通过 CyclicBarrier 同步确保,通过日志输出验证),这样两个交易都不能看到对方的变化
- 在线程 1 中,插入一个新行(共 2 行)并提交
- 在线程2中,统计行数
我的期望是线程 2 的事务在 table 中应该只有 1 条记录,因为线程 2 的事务在线程 1 的事务提交之前启动并且隔离级别设置为可序列化。
CyclicBarrier syncLock = new CyclicBarrier(2);
Runnable sync = Unchecked.runnable(() -> syncLock.await(1, TimeUnit.SECONDS));
jdbi.useTransaction(tx -> {
Queries queries = tx.attach(Queries.class);
queries.create();
// inserts a row with id 1
queries.insert(1, 1);
});
CompletableFuture<Void> first = CompletableFuture.runAsync(Unchecked.runnable(() -> {
jdbi.useTransaction(tx -> {
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
LOGGER.info("first tx started");
sync.run();
Queries queries = tx.attach(Queries.class);
queries.insert(2, 2);
});
LOGGER.info("first tx committed");
Thread.sleep(100);
sync.run();
}));
CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(Unchecked.supplier(() -> {
int out = jdbi.inTransaction(tx -> {
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
LOGGER.info("second tx started");
sync.run();
sync.run();
Queries queries = tx.attach(Queries.class);
// counts the number of rows
return queries.count();
});
LOGGER.info("second tx committed");
return out;
}));
// capture exceptions from either thread
CompletableFuture.allOf(first, subject).get();
assertThat(subject.get()).isEqualTo(1);
输出:
01:28:16.255 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] {main}: took <1ms to execute query create table test(id int primary key, foo int) with arguments { positional:{}, named:{}, finder:[]}
01:28:16.257 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] {main}: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments { positional:{0:1,1:1}, named:{foo:1,id:1}, finder:[]}
01:28:16.313 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-2}: second tx started
01:28:16.313 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-1}: first tx started
01:28:16.315 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] {ForkJoinPool.commonPool-worker-1}: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments { positional:{0:2,1:2}, named:{foo:2,id:2}, finder:[]}
01:28:16.315 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-1}: first tx committed
01:28:16.317 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] {ForkJoinPool.commonPool-worker-2}: took <1ms to execute query select count(*) from test with arguments { positional:{}, named:{}, finder:[]}
01:28:16.318 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-2}: second tx committed
org.junit.ComparisonFailure:
Expected :1
Actual :2
此基本 jdbc 测试未显示此 "read committed" 类行为。测试是绿色的,证明同时启动的可序列化事务即使提交后也看不到彼此的变化:
@Test
public void transactionsAreIsolated() throws SQLException {
@Cleanup
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
statement.execute(CREATE_TABLE);
@Cleanup
Connection c1 = dataSource.getConnection();
@Cleanup
Connection c2 = dataSource.getConnection();
c1.setAutoCommit(false);
c2.setAutoCommit(false);
startTransaction(c1);
startTransaction(c2);
assertThat(count(c1)).isEqualTo(0);
assertThat(count(c2)).isEqualTo(0);
insert(c1, 1);
assertThat(count(c1)).isEqualTo(1);
assertThat(count(c2))
.describedAs("read uncommitted")
.isEqualTo(0);
c1.commit();
assertThat(count(c1)).isEqualTo(1);
assertThat(count(c2))
.describedAs("read committed")
.isEqualTo(0);
}
我在 Jdbi 中复制了您的 JDBC 测试,它有效:
@Test
public void transactionsAreIsolated() {
try (Handle h1 = jdbi.open();
Handle h2 = jdbi.open()) {
h1.begin();
h2.begin();
assertThat(count(h1)).isEqualTo(0);
assertThat(count(h2)).isEqualTo(0); // locks h2's txn to the current snapshot
insert(h1, 1, 1);
assertThat(count(h1)).isEqualTo(1);
assertThat(count(h2)).describedAs("read uncommitted").isEqualTo(0);
h1.commit();
assertThat(count(h1)).isEqualTo(1);
assertThat(count(h2)).describedAs("read committed").isEqualTo(0);
h2.rollback();
}
}
从测试看来,在您实际与事务中的数据库交互之前,事务实际上并未锁定到数据库快照。
在上面的测试中,我们在通过h1
插入行之前通过h2
观察行数。此交互设置了事务快照,这就是您的 JDBC 测试成功的原因。
但是,如果我们修改上述测试以在观察 h2
上的计数之前结束 h1
事务:
@Test
public void transactionsLockToStateWhenObserved() {
try (Handle h1 = jdbi.open();
Handle h2 = jdbi.open()) {
h1.begin();
h2.begin();
insert(h1, 1, 1);
assertThat(count(h1)).isEqualTo(1);
h1.commit();
assertThat(count(h2))
.describedAs("_now_ we're locked to a snapshot")
.isEqualTo(1);
h2.rollback();
}
}
您的原始测试有两个同步点(事务开始,事务 1 已提交),但需要四个才能完整测试您的场景:
@Test
public void concurrentTransactionsAreIsolated() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(2);
Runnable sync = uncheckedRunnable(() -> barrier.await(1, TimeUnit.SECONDS));
jdbi.useTransaction(handle -> insert(handle, 1, 1));
CompletableFuture<Void> first = CompletableFuture.runAsync(uncheckedRunnable(() -> {
jdbi.useTransaction(tx -> {
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
log.info("first tx started");
sync.run(); // wait for both transactions to start
insert(tx, 2, 2);
log.info("first tx inserted row");
sync.run(); // let the second txn check uncommitted reads
sync.run(); // wait for second txn to check the uncommitted reads
});
log.info("first tx committed");
sync.run(); // transaction closed, let second transaction check committed reads
}));
CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(uncheckedSupplier(() -> {
int out = jdbi.inTransaction(tx -> {
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
log.info("second tx started");
sync.run(); // wait for both transactions to start
sync.run(); // wait for first txn to insert
log.info("second tx checking uncommitted read");
assertThat(count(tx)).isEqualTo(1);
sync.run(); // let the first txn commit
sync.run(); // wait for first txn to commit
log.info("second tx checking committed read");
return count(tx);
});
log.info("second tx committed");
return out;
}));
// capture exceptions from either thread
CompletableFuture.allOf(first, subject).get();
assertThat(subject.get()).isEqualTo(1);
}
我在 java 8 上有一个项目 运行,spring boot 1.5.9,hsqldb 2.4.0,jdbi3-core 3.0.0。我正在编写测试来验证数据库行为。一组测试检查并发行为。
我最基本的测试是在 2 个事务之间以某种方式泄漏数据,我不知道为什么。不知道是我测试有问题,是jdbi有bug,还是hqsldb根本坏了。
连接 url jdbc:hsqldb:${...};create=true;hsqldb.tx=mvlocks;hsqldb.tx_level=serializable
注意 tx=mvlocks 和 tx_level=serializable。设置 tx=mvcc 以便 tx_level 成为快照隔离不会改变结果,这更奇怪,因为 快照 在任何更改应该 肯定不能互相影响
测试说明:
- 创建一个 table 有 1 条记录
- 启动 2 个线程
- 在每个交易中,同时开始一个新交易(通过 CyclicBarrier 同步确保,通过日志输出验证),这样两个交易都不能看到对方的变化
- 在线程 1 中,插入一个新行(共 2 行)并提交
- 在线程2中,统计行数
我的期望是线程 2 的事务在 table 中应该只有 1 条记录,因为线程 2 的事务在线程 1 的事务提交之前启动并且隔离级别设置为可序列化。
CyclicBarrier syncLock = new CyclicBarrier(2);
Runnable sync = Unchecked.runnable(() -> syncLock.await(1, TimeUnit.SECONDS));
jdbi.useTransaction(tx -> {
Queries queries = tx.attach(Queries.class);
queries.create();
// inserts a row with id 1
queries.insert(1, 1);
});
CompletableFuture<Void> first = CompletableFuture.runAsync(Unchecked.runnable(() -> {
jdbi.useTransaction(tx -> {
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
LOGGER.info("first tx started");
sync.run();
Queries queries = tx.attach(Queries.class);
queries.insert(2, 2);
});
LOGGER.info("first tx committed");
Thread.sleep(100);
sync.run();
}));
CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(Unchecked.supplier(() -> {
int out = jdbi.inTransaction(tx -> {
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
LOGGER.info("second tx started");
sync.run();
sync.run();
Queries queries = tx.attach(Queries.class);
// counts the number of rows
return queries.count();
});
LOGGER.info("second tx committed");
return out;
}));
// capture exceptions from either thread
CompletableFuture.allOf(first, subject).get();
assertThat(subject.get()).isEqualTo(1);
输出:
01:28:16.255 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] {main}: took <1ms to execute query create table test(id int primary key, foo int) with arguments { positional:{}, named:{}, finder:[]}
01:28:16.257 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] {main}: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments { positional:{0:1,1:1}, named:{foo:1,id:1}, finder:[]}
01:28:16.313 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-2}: second tx started
01:28:16.313 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-1}: first tx started
01:28:16.315 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] {ForkJoinPool.commonPool-worker-1}: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments { positional:{0:2,1:2}, named:{foo:2,id:2}, finder:[]}
01:28:16.315 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-1}: first tx committed
01:28:16.317 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] {ForkJoinPool.commonPool-worker-2}: took <1ms to execute query select count(*) from test with arguments { positional:{}, named:{}, finder:[]}
01:28:16.318 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-2}: second tx committed
org.junit.ComparisonFailure:
Expected :1
Actual :2
此基本 jdbc 测试未显示此 "read committed" 类行为。测试是绿色的,证明同时启动的可序列化事务即使提交后也看不到彼此的变化:
@Test
public void transactionsAreIsolated() throws SQLException {
@Cleanup
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
statement.execute(CREATE_TABLE);
@Cleanup
Connection c1 = dataSource.getConnection();
@Cleanup
Connection c2 = dataSource.getConnection();
c1.setAutoCommit(false);
c2.setAutoCommit(false);
startTransaction(c1);
startTransaction(c2);
assertThat(count(c1)).isEqualTo(0);
assertThat(count(c2)).isEqualTo(0);
insert(c1, 1);
assertThat(count(c1)).isEqualTo(1);
assertThat(count(c2))
.describedAs("read uncommitted")
.isEqualTo(0);
c1.commit();
assertThat(count(c1)).isEqualTo(1);
assertThat(count(c2))
.describedAs("read committed")
.isEqualTo(0);
}
我在 Jdbi 中复制了您的 JDBC 测试,它有效:
@Test
public void transactionsAreIsolated() {
try (Handle h1 = jdbi.open();
Handle h2 = jdbi.open()) {
h1.begin();
h2.begin();
assertThat(count(h1)).isEqualTo(0);
assertThat(count(h2)).isEqualTo(0); // locks h2's txn to the current snapshot
insert(h1, 1, 1);
assertThat(count(h1)).isEqualTo(1);
assertThat(count(h2)).describedAs("read uncommitted").isEqualTo(0);
h1.commit();
assertThat(count(h1)).isEqualTo(1);
assertThat(count(h2)).describedAs("read committed").isEqualTo(0);
h2.rollback();
}
}
从测试看来,在您实际与事务中的数据库交互之前,事务实际上并未锁定到数据库快照。
在上面的测试中,我们在通过h1
插入行之前通过h2
观察行数。此交互设置了事务快照,这就是您的 JDBC 测试成功的原因。
但是,如果我们修改上述测试以在观察 h2
上的计数之前结束 h1
事务:
@Test
public void transactionsLockToStateWhenObserved() {
try (Handle h1 = jdbi.open();
Handle h2 = jdbi.open()) {
h1.begin();
h2.begin();
insert(h1, 1, 1);
assertThat(count(h1)).isEqualTo(1);
h1.commit();
assertThat(count(h2))
.describedAs("_now_ we're locked to a snapshot")
.isEqualTo(1);
h2.rollback();
}
}
您的原始测试有两个同步点(事务开始,事务 1 已提交),但需要四个才能完整测试您的场景:
@Test
public void concurrentTransactionsAreIsolated() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(2);
Runnable sync = uncheckedRunnable(() -> barrier.await(1, TimeUnit.SECONDS));
jdbi.useTransaction(handle -> insert(handle, 1, 1));
CompletableFuture<Void> first = CompletableFuture.runAsync(uncheckedRunnable(() -> {
jdbi.useTransaction(tx -> {
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
log.info("first tx started");
sync.run(); // wait for both transactions to start
insert(tx, 2, 2);
log.info("first tx inserted row");
sync.run(); // let the second txn check uncommitted reads
sync.run(); // wait for second txn to check the uncommitted reads
});
log.info("first tx committed");
sync.run(); // transaction closed, let second transaction check committed reads
}));
CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(uncheckedSupplier(() -> {
int out = jdbi.inTransaction(tx -> {
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
log.info("second tx started");
sync.run(); // wait for both transactions to start
sync.run(); // wait for first txn to insert
log.info("second tx checking uncommitted read");
assertThat(count(tx)).isEqualTo(1);
sync.run(); // let the first txn commit
sync.run(); // wait for first txn to commit
log.info("second tx checking committed read");
return count(tx);
});
log.info("second tx committed");
return out;
}));
// capture exceptions from either thread
CompletableFuture.allOf(first, subject).get();
assertThat(subject.get()).isEqualTo(1);
}