在 InterruptionKey 时使用 psycopg2 杀死未完成的 SQL 提交的查询

Kill unfinished SQL commited query with psycopg2 when InterruptionKey

我使用一个 Python 脚本,该脚本 运行 使用 psycopg2 库在 AWS Redshift 数据库上自动提交各种 SQL 查询。该脚本是从我的本地工作站手动执行的。流程如下:

由于各种原因,数据库可能不可用(网络问题,许多查询已经 运行ning...),最好停止 Python 脚本。此时,我通过 SQL 客户端 (SQL workbench) 通过检索与这些查询关联的 pid 来终止已经提交(和未完成)的查询。当用户停止 (ctrl+c) 时,我想直接在 Python 脚本中自动执行最后一步。流程将是:

我在笔记本电脑上做了一些测试,检查我是否可以检索 back_pid 信息:

log = logging.getLogger(__name__)
session = psycopg2.connect(
    connection_factory=LoggingConnection,
        host=host,
        port=port,
        dbname=database,
        user=user,
        password=password,
        sslmode="require",
)
session.initialize(log)
session.set_session(autocommit=True)

query = """
CREATE OR REPLACE FUNCTION janky_sleep (x float) RETURNS bool IMMUTABLE as $$
    from time import sleep
    sleep(x)
    return True
$$ LANGUAGE plpythonu;
"""

cur = session.cursor()
cur.execute(query)
cur.execute("select janky_sleep(60.0)")

我使用睡眠函数来复制需要 60 秒才能完成的查询行为。 当得到 backend_pid 如下:

session.info.backend_pid

问题是会话对象已经被 execute() 方法使用(运行查询)并且 backend_pid 信息仅在会话空闲时产生,即当查询已完成。

我想旋转一个并发 Python 进程来监视父进程。一旦父进程停止,子进程将通过第二个数据库连接获得 backend_pid,然后 运行 kill 查询。然而,这种方法似乎有些矫枉过正。

处理这种情况的正确方法是什么?

谢谢

我终于使用了从文档中找到的资源: http://initd.org/psycopg/docs/faq.html#faq-interrupt-query。它使 Psycopg2 能够获取 SIGINT 信号并终止后续查询。

>>> psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
>>> cnn = psycopg2.connect('')
>>> cur = cnn.cursor()
>>> cur.execute("select pg_sleep(10)")
^C
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  QueryCanceledError: canceling statement due to user request

>>> cnn.rollback()
>>> # You can use the connection and cursor again from here