使用sqlalchemy + postgresql时如何在芹菜中回滚异常
How to rollback exceptions within celery when using sqlalchemy + postgresql
这就是我的代码的样子
import transaction
@app.task(name='task_name')
def task_name_fn(*args, **kwargs):
with transaction.manager:
try:
actual_fn(*args, **kwargs)
transaction.commit()
except:
transaction.abort()
不过,我的transaction.abort()
好像没有回滚。此 worker 上的所有后续 celery 任务均失败。我收到以下错误
This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback().
我做错了什么?
更好的问题是,您如何编写 task_name_fn
才能避免出现此问题?
首先,您不需要捕捉异常来中止交易。
import transaction
@app.task(name='task_name')
def task_name_fn(*args, **kwargs):
with transaction.manager:
actual_fn(*args, **kwargs)
如果发生异常,交易将中止。
接下来,您可以在任务装饰器中将其抽象化。类似的东西(未经测试,但可能按原样工作):
from functools import wraps
import transaction
def tm_task(f):
@wraps(f)
def decorated(*args, **kwargs):
with transaction.manager:
return f(*args, **kwargs)
return app.task()(decorated)
@tm_task
def actual_fn(*args, **kwargs):
pass # your function code here instead of calling other function
此外,由于您正在使用事务,您可能希望在事务提交后延迟作业排队。因为,例如,如果您在事务中插入一行并排队一个作业以对该行执行某些操作,它可能会在第一个事务提交之前到达工作程序,并且该行在事务之外尚不可用。类似于:
class AfterCommitTask(Task):
def apply_async(self, *args, **kw):
tx = transaction.get()
def hook(status):
if status: # Only queue if the transaction was succesfull.
super(AfterCommitTask, self).apply_async(*args, **kw)
tx.addAfterCommitHook(hook)
def tm_task(f):
@wraps(f)
def decorated(*args, **kwargs):
with transaction.manager:
return f(*args, **kwargs)
return app.task(base=AfterCommitTask)(decorated)
@tm_task
def actual_fn(*args, **kwargs):
pass # your function code here instead of calling other function
这就是我的代码的样子
import transaction
@app.task(name='task_name')
def task_name_fn(*args, **kwargs):
with transaction.manager:
try:
actual_fn(*args, **kwargs)
transaction.commit()
except:
transaction.abort()
不过,我的transaction.abort()
好像没有回滚。此 worker 上的所有后续 celery 任务均失败。我收到以下错误
This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback().
我做错了什么?
更好的问题是,您如何编写 task_name_fn
才能避免出现此问题?
首先,您不需要捕捉异常来中止交易。
import transaction
@app.task(name='task_name')
def task_name_fn(*args, **kwargs):
with transaction.manager:
actual_fn(*args, **kwargs)
如果发生异常,交易将中止。
接下来,您可以在任务装饰器中将其抽象化。类似的东西(未经测试,但可能按原样工作):
from functools import wraps
import transaction
def tm_task(f):
@wraps(f)
def decorated(*args, **kwargs):
with transaction.manager:
return f(*args, **kwargs)
return app.task()(decorated)
@tm_task
def actual_fn(*args, **kwargs):
pass # your function code here instead of calling other function
此外,由于您正在使用事务,您可能希望在事务提交后延迟作业排队。因为,例如,如果您在事务中插入一行并排队一个作业以对该行执行某些操作,它可能会在第一个事务提交之前到达工作程序,并且该行在事务之外尚不可用。类似于:
class AfterCommitTask(Task):
def apply_async(self, *args, **kw):
tx = transaction.get()
def hook(status):
if status: # Only queue if the transaction was succesfull.
super(AfterCommitTask, self).apply_async(*args, **kw)
tx.addAfterCommitHook(hook)
def tm_task(f):
@wraps(f)
def decorated(*args, **kwargs):
with transaction.manager:
return f(*args, **kwargs)
return app.task(base=AfterCommitTask)(decorated)
@tm_task
def actual_fn(*args, **kwargs):
pass # your function code here instead of calling other function