如果数据已更改,则不要更新 ResultSet 中的行

Do not update row in ResultSet if data has changed

我们正在从各种数据库类型(Oracle、MySQL、SQL-Server 等)中提取数据。成功写入文件后,我们希望将其标记为已传输,因此我们更新特定列。

我们的问题是,用户有可能同时更改数据但可能会忘记提交。记录被 select for update 语句阻塞。所以可能会发生,我们将某些东西标记为已传输,而实际上没有。

这是我们代码的摘录:

Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
ResultSet extractedData = stmt.executeQuery(sql);
writeDataToFile(extractedData);
extractedData.beforeFirst();
while (extractedData.next()) {
    if (!extractedData.rowUpdated()) {
        extractedData.updateString("COLUMNNAME", "TRANSMITTED");
        // code will stop here if user has changed data but did not commit
        extractedData.updateRow();
        // once committed the changed data is marked as transmitted
    }
}

方法 extractedData.rowUpdated() returns false,因为从技术上讲用户还没有改变任何东西。 有没有办法不更新行并检测数据是否在这个后期被更改?

很遗憾,我无法更改用户用来更改数据的程序。

所以你想

  • 运行遍历 table 中所有未导出的行
  • 将此数据导出到某处
  • 将这些行标记为已导出,以便您的下一次迭代不会再次导出它们
  • 由于可能有连续的待定更改,您不希望弄乱该信息

怎么样:

You iterate over all rows. 

for every row 
   generate a hash value for the contents of the row
   compare column "UPDATE_STATUS" with calulated hash
   if no match
     export row
     store hash into "UPDATE_STATUS" 
      if store fails (row locked) 
         -> no worries, will be exported again next time
      if store succeeds (on data already changed by user) 
         -> no worries, will be exported again as hash will not match

这可能会进一步减慢您的导出速度,因为您必须遍历所有内容而不是遍历所有内容 WHERE UPDATE_STATUS IS NULL 但您可以同时完成两项工作 - 一项(快速) 迭代 WHERE UPDATE_STATUS IS NULL 和一个缓慢而彻底的 WHERE UPDATE_STATUS IS NOT NULL (哈希重新检查到位)

如果您想避免 store-failures/waits,您可能希望将散列/更新信息存储到第二个 table 复制主键加上散列字段值 - 这样用户 主 table 上的锁根本不会干扰您的更新(因为那些会在另一个 table 上)

"a user [...] might forget to commit" > 用户要么提交,要么不提交。 "Forgetting" 提交等同于他的软件中的错误。

要解决此问题,您需要:

  • 启动隔离级别为 SERIALIZABLE 的事务,并在该事务中:
    • 读取数据并导出。以这种方式读取的数据被阻止更新。
    • 更新您处理的数据。注意:不要使用可更新的 ResultSet 执行此操作,而使用 UPDATE 语句执行此操作。这样你就不需要比 CONCUR_READ_ONLY + TYPE_FORWARD_ONLY.
    • 慢得多的 CONCUR_UPDATABLE + TYPE_SCROLL_SENSITIVE
  • 提交交易。

这样,有缺陷的软件将无法更新您正在处理的数据。

另一种方式

  • 在较低的隔离级别(默认 READ COMMITTED)并在该事务中启动 TRANSACTION
    • Select 具有适当 Table 提示的数据 例如 SQL 服务器 theseTABLOCKX + HOLDLOCK(大型数据集),或 ROWLOCK + XLOCK + HOLDLOCK (小数据集),或 PAGLOCK + XLOCK + HOLDLOCK。将 HOLDLOCK 作为 table 提示实际上等同于进行 SERIALIZABLE 交易。请注意,如果锁的数量变得太多,锁升级可能会将后两者升级为 table 个锁。
    • 更新您处理的数据;注意:使用 UPDATE 语句。丢失 updatable/scroll_sensitive 结果集。
  • 提交交易。

同样,有漏洞的软件将被阻止更新您正在处理的数据。

最后不得不实现乐观锁。在某些 table 中,我们已经有一个存储版本号的列。其他一些 tables 有一个时间戳列,用于保存最后一次更改(由触发器更改)的时间。

虽然时间戳可能并不总是乐观锁定的可靠来源,但我们还是选择了它。在我们的环境中,一秒钟内的几个变化不太现实。

由于我们必须知道主键而不事先描述它,所以我们必须访问结果集元数据。我们的一些数据库不支持此功能(例如 DB/2 遗留 tables)。这些我们还在用老系统。

注意:tableMetaData 是一个 XML-config 文件,其中存储了我们对 table 的描述。这与数据库中 table 的元数据没有直接关系。

Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
ResultSet extractedData = stmt.executeQuery(sql);
writeDataToFile(extractedData);
extractedData.beforeFirst();
while (extractedData.next()) {
    if (tableMetaData.getVersion() != null) {
        markDataAsExported(extractedData, tableMetaData);
    } else {
        markResultSetAsExported(extractedData, tableMetaData);
    }
}

// new way with building of an update statement including the version column in the where clause
private void markDataAsExported(ResultSet extractedData, TableMetaData tableMetaData) throws SQLException {
    ResultSet resultSetPrimaryKeys = null;
    PreparedStatement versionedUpdateStatement = null;
    try {
        ResultSetMetaData extractedMetaData = extractedData.getMetaData();
        resultSetPrimaryKeys = conn.getMetaData().getPrimaryKeys(null, null, tableMetaData.getTable());
        ArrayList<String> primaryKeyList = new ArrayList<String>();
        String sqlStatement = "update " + tableMetaData.getTable() + " set " + tableMetaData.getUpdateColumn()
                + " = ? where ";
        if (resultSetPrimaryKeys.isBeforeFirst()) {
            while (resultSetPrimaryKeys.next()) {
                primaryKeyList.add(resultSetPrimaryKeys.getString(4));
                sqlStatement += resultSetPrimaryKeys.getString(4) + " = ? and ";
            }
            sqlStatement += tableMetaData.getVersionColumn() + " = ?";
            versionedUpdateStatement = conn.prepareStatement(sqlStatement);
            while (extractedData.next()) {
                versionedUpdateStatement.setString(1, tableMetaData.getUpdateValue());
                for (int i = 0; i < primaryKeyList.size(); i++) {
                    versionedUpdateStatement.setObject(i + 2, extractedData.getObject(primaryKeyList.get(i)),
                            extractedMetaData.getColumnType(extractedData.findColumn(primaryKeyList.get(i))));
                }
                versionedUpdateStatement.setObject(primaryKeyList.size() + 2,
                        extractedData.getObject(tableMetaData.getVersionColumn()), tableMetaData.getVersionType());
                if (versionedUpdateStatement.executeUpdate() == 0) {
                    logger.warn(Message.COLLECTOR_DATA_CHANGED, tableMetaData.getTable());
                }
            }
        } else {
            logger.warn(Message.COLLECTOR_PK_ERROR, tableMetaData.getTable());
            markResultSetAsExported(extractedData, tableMetaData);
        }
    } finally {
        if (resultSetPrimaryKeys != null) {
            resultSetPrimaryKeys.close();
        }
        if (versionedUpdateStatement != null) {
            versionedUpdateStatement.close();
        }
    }
}

//the old way as fallback
private void markResultSetAsExported(ResultSet extractedData, TableMetaData tableMetaData) throws SQLException {
    while (extractedData.next()) {
        extractedData.updateString(tableMetaData.getUpdateColumn(), tableMetaData.getUpdateValue());
        extractedData.updateRow();
    }
}