事务回滚
Transaction roll back
我有一个大列表,它本身由 53,000,000 个较小的列表作为元素组成。我想将这些较小的列表中的每一个作为一行提交到数据库,批量大小为 1,000,000,这意味着每次脚本连接到数据库时,它都会提交 1000,000 个元素,然后它与数据库断开连接,然后它再次连接以提交另外 1,000,000 行。
现在我的问题是,如果中间发生错误,例如在提交 50,000,000 行之后,我需要删除数据库中的所有行并尝试从头开始提交所有内容。
我在想也许我可以使用 rollback() 来删除现在添加的所有 50,000,000 行,但是只要我使用循环,我就不知道如何回滚所有 50,000,000 行分批提交。
有人有什么建议吗?
这是我的脚本:
"results" 是包含 53,000,000 个较小列表作为元素的列表。
batch = []
counter = 0
BATCH_SIZE =1000000
cursor_count = 0
def prepare_names(names):
return [w.replace("'", '') for w in names]
for i in range(len(results)):
if counter < BATCH_SIZE:
batch.append(prepare_names([results[i][0], results[i][1], results[i][2]])) # batch => [[ACC1234.0, 'Some full taxa name'], ...]
counter += 1
else:
batch.append(prepare_names([results[i][0], results[i][1], results[i][2]]))
values = (", ").join([f"('{d[0]}', '{d[1]}', '{d[2]}')" for d in batch])
sql = f"INSERT INTO accession_taxonomy(accession_number, taxon_id, taxonomy) VALUES {values}"
try:
cursor.execute(sql)
db.commit()
except Exception as exception:
print(exception)
print(f"Problem with query: {sql}")
print(cursor.rowcount, "Records Inserted")
cursor_count += cursor.rowcount
counter = 0
batch = []
else:
if batch:
values = (", ").join([f"('{d[0]}', '{d[1]}', '{d[2]}')" for d in batch])
sql = f"INSERT INTO accession_taxonomy(accession_number, taxon_id, taxonomy) VALUES {values}"
try:
cursor.execute(sql)
db.commit()
except Exception as exception:
print(exception)
print(f"Problem with query: {sql}")
print(cursor.rowcount, "Records Inserted")
cursor_count += cursor.rowcount
print("Total Number Of %s Rows Has Been Added." %(cursor_count))
db.close()
commit
后没有回滚。
考虑一下:
1st Attempt 1M rows : committed
2nd Attempt 1M rows : committed
3rd Attempt 1m rows : error
您只能回滚第 3 次尝试。第 1 和第 2 已完成。
解决方法
修改您的 accession_taxonomy
table 并添加一个名为 insertHash
的字段。您的批处理更新过程将具有此字段的唯一值 - 用于此批处理。假设 todaysDate
- 如果您的任何插入步骤失败,您可以执行
Delete T from accession_taxonomy T Where T.insertHash ='TheValueUSet'
所以基本上它变成了这样:
1st Attempt 1M rows : committed
2nd Attempt 1M rows : committed
3rd Attempt 1m rows : error
Delete AllRows where insertHash = 'TheValueUSet'
说了这么多,你确定要拍1m行吗?你有没有检查过你的服务器是否能够接受那个大包?
我会使用一些标志来确保
- 插入了一些东西
- 没有发生任何错误
然后,使用这些标志来选择提交或回滚,例如:
nothing_wrong_happened = True
something_was_inserted = False
for i in range(len(results)):
# Your code that generates the query
try:
cursor.execute(sql)
something_was_inserted = True # <-- you inserted something
except Exception as exception:
nothing_wrong_happened = False # <-- Something bad happened
print(exception)
print(f"Problem with query: {sql}")
# the rest of your code
else:
# Your code that generates the query
try:
cursor.execute(sql)
something_was_inserted = True # <-- you inserted something
except Exception as exception:
nothing_wrong_happened = False # <-- Something bad happened
print(exception)
print(f"Problem with query: {sql}")
# the rest of your code
# The loop is now over
if (something_was_inserted):
if (nothing_wrong_happened):
db.commit() # commit everything
else:
db.rollback() # rollback everything
我有一个大列表,它本身由 53,000,000 个较小的列表作为元素组成。我想将这些较小的列表中的每一个作为一行提交到数据库,批量大小为 1,000,000,这意味着每次脚本连接到数据库时,它都会提交 1000,000 个元素,然后它与数据库断开连接,然后它再次连接以提交另外 1,000,000 行。
现在我的问题是,如果中间发生错误,例如在提交 50,000,000 行之后,我需要删除数据库中的所有行并尝试从头开始提交所有内容。
我在想也许我可以使用 rollback() 来删除现在添加的所有 50,000,000 行,但是只要我使用循环,我就不知道如何回滚所有 50,000,000 行分批提交。
有人有什么建议吗?
这是我的脚本: "results" 是包含 53,000,000 个较小列表作为元素的列表。
batch = []
counter = 0
BATCH_SIZE =1000000
cursor_count = 0
def prepare_names(names):
return [w.replace("'", '') for w in names]
for i in range(len(results)):
if counter < BATCH_SIZE:
batch.append(prepare_names([results[i][0], results[i][1], results[i][2]])) # batch => [[ACC1234.0, 'Some full taxa name'], ...]
counter += 1
else:
batch.append(prepare_names([results[i][0], results[i][1], results[i][2]]))
values = (", ").join([f"('{d[0]}', '{d[1]}', '{d[2]}')" for d in batch])
sql = f"INSERT INTO accession_taxonomy(accession_number, taxon_id, taxonomy) VALUES {values}"
try:
cursor.execute(sql)
db.commit()
except Exception as exception:
print(exception)
print(f"Problem with query: {sql}")
print(cursor.rowcount, "Records Inserted")
cursor_count += cursor.rowcount
counter = 0
batch = []
else:
if batch:
values = (", ").join([f"('{d[0]}', '{d[1]}', '{d[2]}')" for d in batch])
sql = f"INSERT INTO accession_taxonomy(accession_number, taxon_id, taxonomy) VALUES {values}"
try:
cursor.execute(sql)
db.commit()
except Exception as exception:
print(exception)
print(f"Problem with query: {sql}")
print(cursor.rowcount, "Records Inserted")
cursor_count += cursor.rowcount
print("Total Number Of %s Rows Has Been Added." %(cursor_count))
db.close()
commit
后没有回滚。
考虑一下:
1st Attempt 1M rows : committed
2nd Attempt 1M rows : committed
3rd Attempt 1m rows : error
您只能回滚第 3 次尝试。第 1 和第 2 已完成。
解决方法
修改您的 accession_taxonomy
table 并添加一个名为 insertHash
的字段。您的批处理更新过程将具有此字段的唯一值 - 用于此批处理。假设 todaysDate
- 如果您的任何插入步骤失败,您可以执行
Delete T from accession_taxonomy T Where T.insertHash ='TheValueUSet'
所以基本上它变成了这样:
1st Attempt 1M rows : committed
2nd Attempt 1M rows : committed
3rd Attempt 1m rows : error
Delete AllRows where insertHash = 'TheValueUSet'
说了这么多,你确定要拍1m行吗?你有没有检查过你的服务器是否能够接受那个大包?
我会使用一些标志来确保
- 插入了一些东西
- 没有发生任何错误
然后,使用这些标志来选择提交或回滚,例如:
nothing_wrong_happened = True
something_was_inserted = False
for i in range(len(results)):
# Your code that generates the query
try:
cursor.execute(sql)
something_was_inserted = True # <-- you inserted something
except Exception as exception:
nothing_wrong_happened = False # <-- Something bad happened
print(exception)
print(f"Problem with query: {sql}")
# the rest of your code
else:
# Your code that generates the query
try:
cursor.execute(sql)
something_was_inserted = True # <-- you inserted something
except Exception as exception:
nothing_wrong_happened = False # <-- Something bad happened
print(exception)
print(f"Problem with query: {sql}")
# the rest of your code
# The loop is now over
if (something_was_inserted):
if (nothing_wrong_happened):
db.commit() # commit everything
else:
db.rollback() # rollback everything