这两个并发事务之间的数据如何泄漏?

How can data be leaking between these 2 concurrent transactions?

我在 java 8 上有一个项目 运行,spring boot 1.5.9,hsqldb 2.4.0,jdbi3-core 3.0.0。我正在编写测试来验证数据库行为。一组测试检查并发行为。

我最基本的测试是在 2 个事务之间以某种方式泄漏数据,我不知道为什么。不知道是我测试有问题,是jdbi有bug,还是hqsldb根本坏了。

连接 url jdbc:hsqldb:${...};create=true;hsqldb.tx=mvlocks;hsqldb.tx_level=serializable

注意 tx=mvlocks 和 tx_level=serializable。设置 tx=mvcc 以便 tx_level 成为快照隔离不会改变结果,这更奇怪,因为 快照 在任何更改应该 肯定不能互相影响

测试说明:

  1. 创建一个 table 有 1 条记录
  2. 启动 2 个线程
  3. 在每个交易中,同时开始一个新交易(通过 CyclicBarrier 同步确保,通过日志输出验证),这样两个交易都不能看到对方的变化
  4. 在线程 1 中,插入一个新行(共 2 行)并提交
  5. 在线程2中,统计行数

我的期望是线程 2 的事务在 table 中应该只有 1 条记录,因为线程 2 的事务在线程 1 的事务提交之前启动并且隔离级别设置为可序列化。

    CyclicBarrier syncLock = new CyclicBarrier(2);
    Runnable sync = Unchecked.runnable(() -> syncLock.await(1, TimeUnit.SECONDS));

    jdbi.useTransaction(tx -> {
        Queries queries = tx.attach(Queries.class);
        queries.create();
        // inserts a row with id 1
        queries.insert(1, 1);
    });

    CompletableFuture<Void> first = CompletableFuture.runAsync(Unchecked.runnable(() -> {
        jdbi.useTransaction(tx -> {
            assertThat(tx.isInTransaction()).isTrue();
            assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

            LOGGER.info("first tx started");
            sync.run();

            Queries queries = tx.attach(Queries.class);
            queries.insert(2, 2);
        });
        LOGGER.info("first tx committed");

        Thread.sleep(100);

        sync.run();
    }));

    CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(Unchecked.supplier(() -> {
        int out = jdbi.inTransaction(tx -> {
            assertThat(tx.isInTransaction()).isTrue();
            assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

            LOGGER.info("second tx started");
            sync.run();
            sync.run();

            Queries queries = tx.attach(Queries.class);
            // counts the number of rows
            return queries.count();
        });
        LOGGER.info("second tx committed");

        return out;
    }));

    // capture exceptions from either thread
    CompletableFuture.allOf(first, subject).get();

    assertThat(subject.get()).isEqualTo(1);

输出:

01:28:16.255 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] {main}: took <1ms to execute query create table test(id int primary key, foo int) with arguments { positional:{}, named:{}, finder:[]}
01:28:16.257 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] {main}: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments { positional:{0:1,1:1}, named:{foo:1,id:1}, finder:[]}
01:28:16.313 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-2}: second tx started
01:28:16.313 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-1}: first tx started
01:28:16.315 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] {ForkJoinPool.commonPool-worker-1}: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments { positional:{0:2,1:2}, named:{foo:2,id:2}, finder:[]}
01:28:16.315 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-1}: first tx committed
01:28:16.317 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] {ForkJoinPool.commonPool-worker-2}: took <1ms to execute query select count(*) from test with arguments { positional:{}, named:{}, finder:[]}
01:28:16.318 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] {ForkJoinPool.commonPool-worker-2}: second tx committed

org.junit.ComparisonFailure: 
Expected :1
Actual   :2

此基本 jdbc 测试未显示此 "read committed" 类行为。测试是绿色的,证明同时启动的可序列化事务即使提交后也看不到彼此的变化:

@Test
public void transactionsAreIsolated() throws SQLException {
    @Cleanup
    Connection connection = dataSource.getConnection();
    Statement statement = connection.createStatement();
    statement.execute(CREATE_TABLE);

    @Cleanup
    Connection c1 = dataSource.getConnection();
    @Cleanup
    Connection c2 = dataSource.getConnection();

    c1.setAutoCommit(false);
    c2.setAutoCommit(false);

    startTransaction(c1);
    startTransaction(c2);

    assertThat(count(c1)).isEqualTo(0);
    assertThat(count(c2)).isEqualTo(0);

    insert(c1, 1);
    assertThat(count(c1)).isEqualTo(1);
    assertThat(count(c2))
        .describedAs("read uncommitted")
        .isEqualTo(0);

    c1.commit();
    assertThat(count(c1)).isEqualTo(1);
    assertThat(count(c2))
        .describedAs("read committed")
        .isEqualTo(0);
}

我在 Jdbi 中复制了您的 JDBC 测试,它有效:

@Test
public void transactionsAreIsolated() {
  try (Handle h1 = jdbi.open();
       Handle h2 = jdbi.open()) {
    h1.begin();
    h2.begin();

    assertThat(count(h1)).isEqualTo(0); 
    assertThat(count(h2)).isEqualTo(0); // locks h2's txn to the current snapshot

    insert(h1, 1, 1);

    assertThat(count(h1)).isEqualTo(1);
    assertThat(count(h2)).describedAs("read uncommitted").isEqualTo(0);

    h1.commit();

    assertThat(count(h1)).isEqualTo(1);
    assertThat(count(h2)).describedAs("read committed").isEqualTo(0);

    h2.rollback();
  }
}

从测试看来,在您实际与事务中的数据库交互之前,事务实际上并未锁定到数据库快照。

在上面的测试中,我们在通过h1插入行之前通过h2观察行数。此交互设置了事务快照,这就是您的 JDBC 测试成功的原因。

但是,如果我们修改上述测试以在观察 h2 上的计数之前结束 h1 事务:

@Test
public void transactionsLockToStateWhenObserved() {
  try (Handle h1 = jdbi.open();
       Handle h2 = jdbi.open()) {
    h1.begin();
    h2.begin();

    insert(h1, 1, 1);

    assertThat(count(h1)).isEqualTo(1);

    h1.commit();

    assertThat(count(h2))
        .describedAs("_now_ we're locked to a snapshot")
        .isEqualTo(1);

    h2.rollback();
  }
}

您的原始测试有两个同步点(事务开始,事务 1 已提交),但需要四个才能完整测试您的场景:

@Test
public void concurrentTransactionsAreIsolated() throws Exception {
  CyclicBarrier barrier = new CyclicBarrier(2);
  Runnable sync = uncheckedRunnable(() -> barrier.await(1, TimeUnit.SECONDS));

  jdbi.useTransaction(handle -> insert(handle, 1, 1));

  CompletableFuture<Void> first = CompletableFuture.runAsync(uncheckedRunnable(() -> {
    jdbi.useTransaction(tx -> {
      assertThat(tx.isInTransaction()).isTrue();
      assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

      log.info("first tx started");
      sync.run(); // wait for both transactions to start

      insert(tx, 2, 2);

      log.info("first tx inserted row");
      sync.run(); // let the second txn check uncommitted reads
      sync.run(); // wait for second txn to check the uncommitted reads
    });

    log.info("first tx committed");
    sync.run(); // transaction closed, let second transaction check committed reads
  }));

  CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(uncheckedSupplier(() -> {
    int out = jdbi.inTransaction(tx -> {
      assertThat(tx.isInTransaction()).isTrue();
      assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

      log.info("second tx started");
      sync.run(); // wait for both transactions to start
      sync.run(); // wait for first txn to insert

      log.info("second tx checking uncommitted read");
      assertThat(count(tx)).isEqualTo(1);

      sync.run(); // let the first txn commit
      sync.run(); // wait for first txn to commit

      log.info("second tx checking committed read");
      return count(tx);
    });
    log.info("second tx committed");

    return out;
  }));

  // capture exceptions from either thread
  CompletableFuture.allOf(first, subject).get();

  assertThat(subject.get()).isEqualTo(1);
}