Postgresql 中的意外死锁(使用 psycopg2 时)

Unexpected deadlocks in Postgresql (while using psycopg2)


我正在处理一个我不明白的 PostgreSQL 死锁问题。
我正在尝试使用 Python、psycopg2 模块和 Postgres 数据库来实现类似 Round Robin 的算法。
我想要一个应用程序的多个实例执行以下操作:
- 将整个 table 与任务列表锁定很短的时间间隔
- 选择要执行的任务(最近最少执行的任务,有一些限制)
- 标记任务,这样其他实例就不会选择它(只允许一个实例同时执行相同的任务)
- 解锁 table
- 执行任务
- 重复
其他会话也应该能够更新此 table 的某些字段。
突然间,我遇到了无法解释的僵局。我已经尽可能地简化了我的 Python 脚本,我在每个语句之后执行一个提交(如果可能的话),但仍然时不时地出现死锁。
出于某种原因,每次遇到死锁时,它都是事务中的第一条语句。这怎么可能?我的 table 没有任何触发器、外键约束或任何会使事情变得复杂的东西。我能想到的唯一解释是 PostgreSQL 不会在提交后立即释放锁。或者可能是 psycopg2 没有按我预期的方式工作?我未能通过在不同会话中手动 运行ning 语句重现该问题。
死锁很少见,但我至少每隔几个小时就会遇到一次

我 运行正在使用 PostgreSQL 9.6.1 和 Python 2.7.12

这是我运行的代码(这只是我为了解决问题而制作的简化示例):

import psycopg2
import sys
import datetime
import time
sys.path.append('/opt/workflow/lib')
import config
import ovs_lib


instance_type='scan_master'
instance_id=sys.argv[1]

dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass'])
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False)
cursor = dbh.cursor()
cursor.execute("SET search_path TO "+config.values['pgsql']['schema'])

def sanitize(string):
  string=string.replace("'","''")
  return string

def get_task(instance_id):
  task_id=None
  out_struct={}
  instance_id=sanitize(instance_id)
  #Lock whole table
  dbh.commit() #Just in case
  cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table
  cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run
  #Now get the task
  sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n"
  sql+="FROM wf_task t\n"
  sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n"
  sql+="WHERE status='A'\n"
  sql+="AND t.scanner_instance_id is NULL\n"
  sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n"
  sql+="ORDER BY last_scan_ts\n"
  sql+="LIMIT 1\n"
  cursor.execute(sql)
  cnt=cursor.rowcount
  if cnt>0:
    row=cursor.fetchone()
    task_id=row[0]
    sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)
    cursor.execute(sql)
    scanner_function=row[1]
    parallel_runs=row[2]
    out_struct['task_id']=task_id
    out_struct['scanner_function']=scanner_function
    out_struct['parallel_runs']=parallel_runs
  dbh.commit()
  return out_struct

def process_task(task_id):
  sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
  cursor.execute(sql)
  dbh.commit()
  sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
  cursor.execute(sql)
  dbh.commit()

while True:
  if not ovs_lib.check_control(instance_type, instance_id):
    now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
    print now_time+" Stop sygnal received"
    exit(0)
  task_struct=get_task(instance_id)
  if 'task_id' not in task_struct:
    time.sleep(1)
    continue
  process_task(task_struct['task_id'])

下面是我遇到的错误示例:

Traceback (most recent call last):
  File "/opt/workflow/bin/scan_simple.py", line 70, in <module>
process_task(task_struct['task_id'])
  File "/opt/workflow/bin/scan_simple.py", line 58, in process_task
cursor.execute(sql)
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL:  Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (8,12) in relation "wf_task"

Traceback (most recent call last):
  File "/opt/workflow/bin/scan_simple.py", line 66, in <module>
    task_struct=get_task(instance_id)
  File "/opt/workflow/bin/scan_simple.py", line 27, in get_task
    cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL:  Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.
HINT:  See server log for query details.
CONTEXT:  while locking tuple (17,9) in relation “wf_task"

当时我同时拥有这个脚本的 6 个实例 运行ning 数据库中没有其他会话处于活动状态。

稍后更新
今天我学到了一些关于 Postgres 的新知识,它与这个问题非常相关
从 9.5 版开始,PostgreSQL 支持 SKIP LOCKED 语句,它以非常优雅的方式解决了我试图设计我的应用程序的问题
如果您在尝试实施某种队列或循环解决方案时正在努力解决 PostgreSQL 中的并发问题,那么您绝对必须阅读以下内容:
https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/

问题可能是第一个 SELECT ... FOR UPDATE 中的顺序扫描并不总是 return 相同顺序的行,因此并发执行此语句会锁定 table 顺序不同。这会导致您遇到僵局。

有几种解决方案,增加善良:

  • 我认为为这次更新锁定整个 table 的技术对性能来说很糟糕,但如果你坚持保留你的代码,你可以将 synchronize_seqscans 设置为off 以便所有顺序扫描 return 行以相同的顺序。但是你真的不应该像你那样将所有行锁定在 table 中,因为

    • 这会导致不必要的顺序扫描。

    • 不安全。在您锁定行的时间和您 运行 您的 UPDATE 之间的时间之间,有人可以 INSERT 新行。

  • 如果您真的想锁定整个 table,请使用 LOCK TABLE 语句而不是锁定 table 中的所有行。这也将摆脱僵局。

  • 最好的解决方案可能是用 UPDATE 本身锁定行。为避免死锁,请检查 PostgreSQL 用于 UPDATE 的执行计划。这将是索引扫描或顺序扫描。使用索引扫描是安全的,因为这将 return 行按特定顺序排列。对于顺序扫描,禁用上述 synchronize_seqscans 功能,理想情况下仅针对事务:

    START TRANSACTION;
    SET LOCAL synchronize_seqscans = off;
    /* your UPDATEs go here */
    COMMIT;