psycopg2 - 将操作推迟到提交之后

psycopg2 - deferring action until after commit

我想找到一个好的模式来将执行操作推迟到当前事务成功提交之后。 例如,我可能有:

with pool(...)  as connection:
    execute_steps(connection) 

其中一个子函数通过将它们提交到外部消息队列来启动一个或多个异步任务(rabbit/AMQP,...)...仅当事务成功提交时。 即

 def substep(connection): 
       submit_to_queue()

... 如果事务随后失败(在提交到队列之后),或者如果放入队列的作业在事务提交之前执行,则可能会失败。所以——submit_to_queue 必须在 connection.commit.

之后执行

执行此操作的自然方法是收集回调步骤:

def substep(connection):

  def submit_to_queue(): ...
  ...
  connection.on_after_commit(submit_to_queue)
  ...

并拥有:

@contextmanager
def db_pool(db_name="lb2", current_connection=None):

    connection = connection_pool.getconn()
    connection.on_after_commit = []
    try:

        # Do the work
        yield connection

        connection.commit()

        # Reset the after_success list and execute
        for job in connection.after_success():
            job()
        connection.after_success = [], 

    except Exception:
        connection.rollback()
        connection.after_success = []
        raise

但是,psycopg2 的连接是一个不可变对象。那么...还有另一种方法可以使用 psycopg2 注册 post-执行回调吗?

谢谢。

子类化 psycopg2.extensions.connection 将允许您向对象添加属性(请参阅解释 here 为什么):

In [44]: import psycopg2

In [45]: import psycopg2.extensions

In [46]: conn = psycopg2.connect('dbname=postgres')

In [47]: conn.foo = 'bar'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-47-3ab9f2c6e77e> in <module>()
----> 1 conn.foo = 'bar'

AttributeError: 'psycopg2.extensions.connection' object has no attribute 'foo'

In [48]: class MyConnection(psycopg2.extensions.connection):
   ...       pass
   ...

In [49]: conn2 = MyConnection('dbname=postgres')

In [50]: conn2.foo = 'bar'

In [51]: conn2.foo
Out[51]: 'bar'