Postgresql 中的意外死锁(使用 psycopg2 时)
Unexpected deadlocks in Postgresql (while using psycopg2)
我正在处理一个我不明白的 PostgreSQL 死锁问题。
我正在尝试使用 Python、psycopg2 模块和 Postgres 数据库来实现类似 Round Robin 的算法。
我想要一个应用程序的多个实例执行以下操作:
- 将整个 table 与任务列表锁定很短的时间间隔
- 选择要执行的任务(最近最少执行的任务,有一些限制)
- 标记任务,这样其他实例就不会选择它(只允许一个实例同时执行相同的任务)
- 解锁 table
- 执行任务
- 重复
其他会话也应该能够更新此 table 的某些字段。
突然间,我遇到了无法解释的僵局。我已经尽可能地简化了我的 Python 脚本,我在每个语句之后执行一个提交(如果可能的话),但仍然时不时地出现死锁。
出于某种原因,每次遇到死锁时,它都是事务中的第一条语句。这怎么可能?我的 table 没有任何触发器、外键约束或任何会使事情变得复杂的东西。我能想到的唯一解释是 PostgreSQL 不会在提交后立即释放锁。或者可能是 psycopg2 没有按我预期的方式工作?我未能通过在不同会话中手动 运行ning 语句重现该问题。
死锁很少见,但我至少每隔几个小时就会遇到一次
我 运行正在使用 PostgreSQL 9.6.1 和 Python 2.7.12
这是我运行的代码(这只是我为了解决问题而制作的简化示例):
import psycopg2
import sys
import datetime
import time
sys.path.append('/opt/workflow/lib')
import config
import ovs_lib
instance_type='scan_master'
instance_id=sys.argv[1]
dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass'])
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False)
cursor = dbh.cursor()
cursor.execute("SET search_path TO "+config.values['pgsql']['schema'])
def sanitize(string):
string=string.replace("'","''")
return string
def get_task(instance_id):
task_id=None
out_struct={}
instance_id=sanitize(instance_id)
#Lock whole table
dbh.commit() #Just in case
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table
cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run
#Now get the task
sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n"
sql+="FROM wf_task t\n"
sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n"
sql+="WHERE status='A'\n"
sql+="AND t.scanner_instance_id is NULL\n"
sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n"
sql+="ORDER BY last_scan_ts\n"
sql+="LIMIT 1\n"
cursor.execute(sql)
cnt=cursor.rowcount
if cnt>0:
row=cursor.fetchone()
task_id=row[0]
sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)
cursor.execute(sql)
scanner_function=row[1]
parallel_runs=row[2]
out_struct['task_id']=task_id
out_struct['scanner_function']=scanner_function
out_struct['parallel_runs']=parallel_runs
dbh.commit()
return out_struct
def process_task(task_id):
sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
while True:
if not ovs_lib.check_control(instance_type, instance_id):
now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
print now_time+" Stop sygnal received"
exit(0)
task_struct=get_task(instance_id)
if 'task_id' not in task_struct:
time.sleep(1)
continue
process_task(task_struct['task_id'])
下面是我遇到的错误示例:
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 70, in <module>
process_task(task_struct['task_id'])
File "/opt/workflow/bin/scan_simple.py", line 58, in process_task
cursor.execute(sql)
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.
HINT: See server log for query details.
CONTEXT: while updating tuple (8,12) in relation "wf_task"
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 66, in <module>
task_struct=get_task(instance_id)
File "/opt/workflow/bin/scan_simple.py", line 27, in get_task
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.
HINT: See server log for query details.
CONTEXT: while locking tuple (17,9) in relation “wf_task"
当时我同时拥有这个脚本的 6 个实例 运行ning
数据库中没有其他会话处于活动状态。
稍后更新
今天我学到了一些关于 Postgres 的新知识,它与这个问题非常相关
从 9.5 版开始,PostgreSQL 支持 SKIP LOCKED 语句,它以非常优雅的方式解决了我试图设计我的应用程序的问题
如果您在尝试实施某种队列或循环解决方案时正在努力解决 PostgreSQL 中的并发问题,那么您绝对必须阅读以下内容:
https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
问题可能是第一个 SELECT ... FOR UPDATE
中的顺序扫描并不总是 return 相同顺序的行,因此并发执行此语句会锁定 table 顺序不同。这会导致您遇到僵局。
有几种解决方案,增加善良:
我认为为这次更新锁定整个 table 的技术对性能来说很糟糕,但如果你坚持保留你的代码,你可以将 synchronize_seqscans
设置为off
以便所有顺序扫描 return 行以相同的顺序。但是你真的不应该像你那样将所有行锁定在 table 中,因为
这会导致不必要的顺序扫描。
不安全。在您锁定行的时间和您 运行 您的 UPDATE
之间的时间之间,有人可以 INSERT
新行。
如果您真的想锁定整个 table,请使用 LOCK TABLE
语句而不是锁定 table 中的所有行。这也将摆脱僵局。
最好的解决方案可能是用 UPDATE
本身锁定行。为避免死锁,请检查 PostgreSQL 用于 UPDATE
的执行计划。这将是索引扫描或顺序扫描。使用索引扫描是安全的,因为这将 return 行按特定顺序排列。对于顺序扫描,禁用上述 synchronize_seqscans
功能,理想情况下仅针对事务:
START TRANSACTION;
SET LOCAL synchronize_seqscans = off;
/* your UPDATEs go here */
COMMIT;
我正在处理一个我不明白的 PostgreSQL 死锁问题。
我正在尝试使用 Python、psycopg2 模块和 Postgres 数据库来实现类似 Round Robin 的算法。
我想要一个应用程序的多个实例执行以下操作:
- 将整个 table 与任务列表锁定很短的时间间隔
- 选择要执行的任务(最近最少执行的任务,有一些限制)
- 标记任务,这样其他实例就不会选择它(只允许一个实例同时执行相同的任务)
- 解锁 table
- 执行任务
- 重复
其他会话也应该能够更新此 table 的某些字段。
突然间,我遇到了无法解释的僵局。我已经尽可能地简化了我的 Python 脚本,我在每个语句之后执行一个提交(如果可能的话),但仍然时不时地出现死锁。
出于某种原因,每次遇到死锁时,它都是事务中的第一条语句。这怎么可能?我的 table 没有任何触发器、外键约束或任何会使事情变得复杂的东西。我能想到的唯一解释是 PostgreSQL 不会在提交后立即释放锁。或者可能是 psycopg2 没有按我预期的方式工作?我未能通过在不同会话中手动 运行ning 语句重现该问题。
死锁很少见,但我至少每隔几个小时就会遇到一次
我 运行正在使用 PostgreSQL 9.6.1 和 Python 2.7.12
这是我运行的代码(这只是我为了解决问题而制作的简化示例):
import psycopg2
import sys
import datetime
import time
sys.path.append('/opt/workflow/lib')
import config
import ovs_lib
instance_type='scan_master'
instance_id=sys.argv[1]
dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass'])
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False)
cursor = dbh.cursor()
cursor.execute("SET search_path TO "+config.values['pgsql']['schema'])
def sanitize(string):
string=string.replace("'","''")
return string
def get_task(instance_id):
task_id=None
out_struct={}
instance_id=sanitize(instance_id)
#Lock whole table
dbh.commit() #Just in case
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table
cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run
#Now get the task
sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n"
sql+="FROM wf_task t\n"
sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n"
sql+="WHERE status='A'\n"
sql+="AND t.scanner_instance_id is NULL\n"
sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n"
sql+="ORDER BY last_scan_ts\n"
sql+="LIMIT 1\n"
cursor.execute(sql)
cnt=cursor.rowcount
if cnt>0:
row=cursor.fetchone()
task_id=row[0]
sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)
cursor.execute(sql)
scanner_function=row[1]
parallel_runs=row[2]
out_struct['task_id']=task_id
out_struct['scanner_function']=scanner_function
out_struct['parallel_runs']=parallel_runs
dbh.commit()
return out_struct
def process_task(task_id):
sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
cursor.execute(sql)
dbh.commit()
while True:
if not ovs_lib.check_control(instance_type, instance_id):
now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
print now_time+" Stop sygnal received"
exit(0)
task_struct=get_task(instance_id)
if 'task_id' not in task_struct:
time.sleep(1)
continue
process_task(task_struct['task_id'])
下面是我遇到的错误示例:
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 70, in <module>
process_task(task_struct['task_id'])
File "/opt/workflow/bin/scan_simple.py", line 58, in process_task
cursor.execute(sql)
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.
HINT: See server log for query details.
CONTEXT: while updating tuple (8,12) in relation "wf_task"
Traceback (most recent call last):
File "/opt/workflow/bin/scan_simple.py", line 66, in <module>
task_struct=get_task(instance_id)
File "/opt/workflow/bin/scan_simple.py", line 27, in get_task
cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL: Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.
HINT: See server log for query details.
CONTEXT: while locking tuple (17,9) in relation “wf_task"
当时我同时拥有这个脚本的 6 个实例 运行ning
数据库中没有其他会话处于活动状态。
稍后更新
今天我学到了一些关于 Postgres 的新知识,它与这个问题非常相关
从 9.5 版开始,PostgreSQL 支持 SKIP LOCKED 语句,它以非常优雅的方式解决了我试图设计我的应用程序的问题
如果您在尝试实施某种队列或循环解决方案时正在努力解决 PostgreSQL 中的并发问题,那么您绝对必须阅读以下内容:
https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
问题可能是第一个 SELECT ... FOR UPDATE
中的顺序扫描并不总是 return 相同顺序的行,因此并发执行此语句会锁定 table 顺序不同。这会导致您遇到僵局。
有几种解决方案,增加善良:
我认为为这次更新锁定整个 table 的技术对性能来说很糟糕,但如果你坚持保留你的代码,你可以将
synchronize_seqscans
设置为off
以便所有顺序扫描 return 行以相同的顺序。但是你真的不应该像你那样将所有行锁定在 table 中,因为这会导致不必要的顺序扫描。
不安全。在您锁定行的时间和您 运行 您的
UPDATE
之间的时间之间,有人可以INSERT
新行。
如果您真的想锁定整个 table,请使用
LOCK TABLE
语句而不是锁定 table 中的所有行。这也将摆脱僵局。最好的解决方案可能是用
UPDATE
本身锁定行。为避免死锁,请检查 PostgreSQL 用于UPDATE
的执行计划。这将是索引扫描或顺序扫描。使用索引扫描是安全的,因为这将 return 行按特定顺序排列。对于顺序扫描,禁用上述synchronize_seqscans
功能,理想情况下仅针对事务:START TRANSACTION; SET LOCAL synchronize_seqscans = off; /* your UPDATEs go here */ COMMIT;