如果数据已更改,则不要更新 ResultSet 中的行
Do not update row in ResultSet if data has changed
我们正在从各种数据库类型(Oracle、MySQL、SQL-Server 等)中提取数据。成功写入文件后,我们希望将其标记为已传输,因此我们更新特定列。
我们的问题是,用户有可能同时更改数据但可能会忘记提交。记录被 select for update 语句阻塞。所以可能会发生,我们将某些东西标记为已传输,而实际上没有。
这是我们代码的摘录:
Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
ResultSet extractedData = stmt.executeQuery(sql);
writeDataToFile(extractedData);
extractedData.beforeFirst();
while (extractedData.next()) {
if (!extractedData.rowUpdated()) {
extractedData.updateString("COLUMNNAME", "TRANSMITTED");
// code will stop here if user has changed data but did not commit
extractedData.updateRow();
// once committed the changed data is marked as transmitted
}
}
方法 extractedData.rowUpdated()
returns false,因为从技术上讲用户还没有改变任何东西。
有没有办法不更新行并检测数据是否在这个后期被更改?
很遗憾,我无法更改用户用来更改数据的程序。
所以你想
- 运行遍历 table 中所有未导出的行
- 将此数据导出到某处
- 将这些行标记为已导出,以便您的下一次迭代不会再次导出它们
- 由于可能有连续的待定更改,您不希望弄乱该信息
怎么样:
You iterate over all rows.
for every row
generate a hash value for the contents of the row
compare column "UPDATE_STATUS" with calulated hash
if no match
export row
store hash into "UPDATE_STATUS"
if store fails (row locked)
-> no worries, will be exported again next time
if store succeeds (on data already changed by user)
-> no worries, will be exported again as hash will not match
这可能会进一步减慢您的导出速度,因为您必须遍历所有内容而不是遍历所有内容 WHERE UPDATE_STATUS IS NULL
但您可以同时完成两项工作 - 一项(快速)
迭代 WHERE UPDATE_STATUS IS NULL
和一个缓慢而彻底的 WHERE UPDATE_STATUS IS NOT NULL
(哈希重新检查到位)
如果您想避免 store-failures/waits,您可能希望将散列/更新信息存储到第二个 table 复制主键加上散列字段值 - 这样用户
主 table 上的锁根本不会干扰您的更新(因为那些会在另一个 table 上)
"a user [...] might forget to commit" > 用户要么提交,要么不提交。 "Forgetting" 提交等同于他的软件中的错误。
要解决此问题,您需要:
- 启动隔离级别为
SERIALIZABLE
的事务,并在该事务中:
- 读取数据并导出。以这种方式读取的数据被阻止更新。
- 更新您处理的数据。注意:不要使用可更新的
ResultSet
执行此操作,而使用 UPDATE
语句执行此操作。这样你就不需要比 CONCUR_READ_ONLY + TYPE_FORWARD_ONLY
. 慢得多的 CONCUR_UPDATABLE + TYPE_SCROLL_SENSITIVE
- 提交交易。
这样,有缺陷的软件将无法更新您正在处理的数据。
另一种方式
- 在较低的隔离级别(默认
READ COMMITTED
)并在该事务中启动 TRANSACTION
- Select 具有适当 Table 提示的数据 例如 SQL 服务器 these:
TABLOCKX + HOLDLOCK
(大型数据集),或 ROWLOCK + XLOCK + HOLDLOCK
(小数据集),或 PAGLOCK + XLOCK + HOLDLOCK
。将 HOLDLOCK
作为 table 提示实际上等同于进行 SERIALIZABLE
交易。请注意,如果锁的数量变得太多,锁升级可能会将后两者升级为 table 个锁。
- 更新您处理的数据;注意:使用
UPDATE
语句。丢失 updatable/scroll_sensitive 结果集。
- 提交交易。
同样,有漏洞的软件将被阻止更新您正在处理的数据。
最后不得不实现乐观锁。在某些 table 中,我们已经有一个存储版本号的列。其他一些 tables 有一个时间戳列,用于保存最后一次更改(由触发器更改)的时间。
虽然时间戳可能并不总是乐观锁定的可靠来源,但我们还是选择了它。在我们的环境中,一秒钟内的几个变化不太现实。
由于我们必须知道主键而不事先描述它,所以我们必须访问结果集元数据。我们的一些数据库不支持此功能(例如 DB/2 遗留 tables)。这些我们还在用老系统。
注意:tableMetaData
是一个 XML-config 文件,其中存储了我们对 table 的描述。这与数据库中 table 的元数据没有直接关系。
Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
ResultSet extractedData = stmt.executeQuery(sql);
writeDataToFile(extractedData);
extractedData.beforeFirst();
while (extractedData.next()) {
if (tableMetaData.getVersion() != null) {
markDataAsExported(extractedData, tableMetaData);
} else {
markResultSetAsExported(extractedData, tableMetaData);
}
}
// new way with building of an update statement including the version column in the where clause
private void markDataAsExported(ResultSet extractedData, TableMetaData tableMetaData) throws SQLException {
ResultSet resultSetPrimaryKeys = null;
PreparedStatement versionedUpdateStatement = null;
try {
ResultSetMetaData extractedMetaData = extractedData.getMetaData();
resultSetPrimaryKeys = conn.getMetaData().getPrimaryKeys(null, null, tableMetaData.getTable());
ArrayList<String> primaryKeyList = new ArrayList<String>();
String sqlStatement = "update " + tableMetaData.getTable() + " set " + tableMetaData.getUpdateColumn()
+ " = ? where ";
if (resultSetPrimaryKeys.isBeforeFirst()) {
while (resultSetPrimaryKeys.next()) {
primaryKeyList.add(resultSetPrimaryKeys.getString(4));
sqlStatement += resultSetPrimaryKeys.getString(4) + " = ? and ";
}
sqlStatement += tableMetaData.getVersionColumn() + " = ?";
versionedUpdateStatement = conn.prepareStatement(sqlStatement);
while (extractedData.next()) {
versionedUpdateStatement.setString(1, tableMetaData.getUpdateValue());
for (int i = 0; i < primaryKeyList.size(); i++) {
versionedUpdateStatement.setObject(i + 2, extractedData.getObject(primaryKeyList.get(i)),
extractedMetaData.getColumnType(extractedData.findColumn(primaryKeyList.get(i))));
}
versionedUpdateStatement.setObject(primaryKeyList.size() + 2,
extractedData.getObject(tableMetaData.getVersionColumn()), tableMetaData.getVersionType());
if (versionedUpdateStatement.executeUpdate() == 0) {
logger.warn(Message.COLLECTOR_DATA_CHANGED, tableMetaData.getTable());
}
}
} else {
logger.warn(Message.COLLECTOR_PK_ERROR, tableMetaData.getTable());
markResultSetAsExported(extractedData, tableMetaData);
}
} finally {
if (resultSetPrimaryKeys != null) {
resultSetPrimaryKeys.close();
}
if (versionedUpdateStatement != null) {
versionedUpdateStatement.close();
}
}
}
//the old way as fallback
private void markResultSetAsExported(ResultSet extractedData, TableMetaData tableMetaData) throws SQLException {
while (extractedData.next()) {
extractedData.updateString(tableMetaData.getUpdateColumn(), tableMetaData.getUpdateValue());
extractedData.updateRow();
}
}
我们正在从各种数据库类型(Oracle、MySQL、SQL-Server 等)中提取数据。成功写入文件后,我们希望将其标记为已传输,因此我们更新特定列。
我们的问题是,用户有可能同时更改数据但可能会忘记提交。记录被 select for update 语句阻塞。所以可能会发生,我们将某些东西标记为已传输,而实际上没有。
这是我们代码的摘录:
Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
ResultSet extractedData = stmt.executeQuery(sql);
writeDataToFile(extractedData);
extractedData.beforeFirst();
while (extractedData.next()) {
if (!extractedData.rowUpdated()) {
extractedData.updateString("COLUMNNAME", "TRANSMITTED");
// code will stop here if user has changed data but did not commit
extractedData.updateRow();
// once committed the changed data is marked as transmitted
}
}
方法 extractedData.rowUpdated()
returns false,因为从技术上讲用户还没有改变任何东西。
有没有办法不更新行并检测数据是否在这个后期被更改?
很遗憾,我无法更改用户用来更改数据的程序。
所以你想
- 运行遍历 table 中所有未导出的行
- 将此数据导出到某处
- 将这些行标记为已导出,以便您的下一次迭代不会再次导出它们
- 由于可能有连续的待定更改,您不希望弄乱该信息
怎么样:
You iterate over all rows.
for every row
generate a hash value for the contents of the row
compare column "UPDATE_STATUS" with calulated hash
if no match
export row
store hash into "UPDATE_STATUS"
if store fails (row locked)
-> no worries, will be exported again next time
if store succeeds (on data already changed by user)
-> no worries, will be exported again as hash will not match
这可能会进一步减慢您的导出速度,因为您必须遍历所有内容而不是遍历所有内容 WHERE UPDATE_STATUS IS NULL
但您可以同时完成两项工作 - 一项(快速)
迭代 WHERE UPDATE_STATUS IS NULL
和一个缓慢而彻底的 WHERE UPDATE_STATUS IS NOT NULL
(哈希重新检查到位)
如果您想避免 store-failures/waits,您可能希望将散列/更新信息存储到第二个 table 复制主键加上散列字段值 - 这样用户 主 table 上的锁根本不会干扰您的更新(因为那些会在另一个 table 上)
"a user [...] might forget to commit" > 用户要么提交,要么不提交。 "Forgetting" 提交等同于他的软件中的错误。
要解决此问题,您需要:
- 启动隔离级别为
SERIALIZABLE
的事务,并在该事务中:- 读取数据并导出。以这种方式读取的数据被阻止更新。
- 更新您处理的数据。注意:不要使用可更新的
ResultSet
执行此操作,而使用UPDATE
语句执行此操作。这样你就不需要比CONCUR_READ_ONLY + TYPE_FORWARD_ONLY
. 慢得多的
CONCUR_UPDATABLE + TYPE_SCROLL_SENSITIVE
- 提交交易。
这样,有缺陷的软件将无法更新您正在处理的数据。
另一种方式
- 在较低的隔离级别(默认
READ COMMITTED
)并在该事务中启动TRANSACTION
- Select 具有适当 Table 提示的数据 例如 SQL 服务器 these:
TABLOCKX + HOLDLOCK
(大型数据集),或ROWLOCK + XLOCK + HOLDLOCK
(小数据集),或PAGLOCK + XLOCK + HOLDLOCK
。将HOLDLOCK
作为 table 提示实际上等同于进行SERIALIZABLE
交易。请注意,如果锁的数量变得太多,锁升级可能会将后两者升级为 table 个锁。 - 更新您处理的数据;注意:使用
UPDATE
语句。丢失 updatable/scroll_sensitive 结果集。
- Select 具有适当 Table 提示的数据 例如 SQL 服务器 these:
- 提交交易。
同样,有漏洞的软件将被阻止更新您正在处理的数据。
最后不得不实现乐观锁。在某些 table 中,我们已经有一个存储版本号的列。其他一些 tables 有一个时间戳列,用于保存最后一次更改(由触发器更改)的时间。
虽然时间戳可能并不总是乐观锁定的可靠来源,但我们还是选择了它。在我们的环境中,一秒钟内的几个变化不太现实。
由于我们必须知道主键而不事先描述它,所以我们必须访问结果集元数据。我们的一些数据库不支持此功能(例如 DB/2 遗留 tables)。这些我们还在用老系统。
注意:tableMetaData
是一个 XML-config 文件,其中存储了我们对 table 的描述。这与数据库中 table 的元数据没有直接关系。
Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
ResultSet extractedData = stmt.executeQuery(sql);
writeDataToFile(extractedData);
extractedData.beforeFirst();
while (extractedData.next()) {
if (tableMetaData.getVersion() != null) {
markDataAsExported(extractedData, tableMetaData);
} else {
markResultSetAsExported(extractedData, tableMetaData);
}
}
// new way with building of an update statement including the version column in the where clause
private void markDataAsExported(ResultSet extractedData, TableMetaData tableMetaData) throws SQLException {
ResultSet resultSetPrimaryKeys = null;
PreparedStatement versionedUpdateStatement = null;
try {
ResultSetMetaData extractedMetaData = extractedData.getMetaData();
resultSetPrimaryKeys = conn.getMetaData().getPrimaryKeys(null, null, tableMetaData.getTable());
ArrayList<String> primaryKeyList = new ArrayList<String>();
String sqlStatement = "update " + tableMetaData.getTable() + " set " + tableMetaData.getUpdateColumn()
+ " = ? where ";
if (resultSetPrimaryKeys.isBeforeFirst()) {
while (resultSetPrimaryKeys.next()) {
primaryKeyList.add(resultSetPrimaryKeys.getString(4));
sqlStatement += resultSetPrimaryKeys.getString(4) + " = ? and ";
}
sqlStatement += tableMetaData.getVersionColumn() + " = ?";
versionedUpdateStatement = conn.prepareStatement(sqlStatement);
while (extractedData.next()) {
versionedUpdateStatement.setString(1, tableMetaData.getUpdateValue());
for (int i = 0; i < primaryKeyList.size(); i++) {
versionedUpdateStatement.setObject(i + 2, extractedData.getObject(primaryKeyList.get(i)),
extractedMetaData.getColumnType(extractedData.findColumn(primaryKeyList.get(i))));
}
versionedUpdateStatement.setObject(primaryKeyList.size() + 2,
extractedData.getObject(tableMetaData.getVersionColumn()), tableMetaData.getVersionType());
if (versionedUpdateStatement.executeUpdate() == 0) {
logger.warn(Message.COLLECTOR_DATA_CHANGED, tableMetaData.getTable());
}
}
} else {
logger.warn(Message.COLLECTOR_PK_ERROR, tableMetaData.getTable());
markResultSetAsExported(extractedData, tableMetaData);
}
} finally {
if (resultSetPrimaryKeys != null) {
resultSetPrimaryKeys.close();
}
if (versionedUpdateStatement != null) {
versionedUpdateStatement.close();
}
}
}
//the old way as fallback
private void markResultSetAsExported(ResultSet extractedData, TableMetaData tableMetaData) throws SQLException {
while (extractedData.next()) {
extractedData.updateString(tableMetaData.getUpdateColumn(), tableMetaData.getUpdateValue());
extractedData.updateRow();
}
}