Celery+Django -- 数据库相关任务的原子事务
Celery+Django -- Atomic transaction with database-related task
在我当前使用 Django、Docker-Compose 和 Celery(以及其他东西)的项目中,基本的上传文件功能 insertIntoDatabase
是从任务中调用的,并且在 views.py
使用 delay
.
调用任务
在databaseinserter.py中:
def insertIntoDatabase(datapoints, user, description): # datapoints is a list of dictionaries, user and description are just strings
# convert data and upload to our database
在tasks.py中:
@app.task()
def db_ins_task(datapoints, user, description):
from databaseinserter import insertIntoDatabase
insertIntoDatabase(datapoints, user, description)
在views.py中:
with transaction.atomic():
db_ins_task.delay(datapoints, user, description)
在将 Celery 引入项目之前,insertIntoDatabase
只是在 views.py
中直接调用,因此不会插入任何无效的数据点列表(即格式不正确的),整个上传将是取消并回滚。但是,现在上传是在异步 celery 任务中,无效的上传不再被正确回滚。既然上传是一项任务,我如何才能确保无效的上传仍然被取消并完全撤消?似乎 Django 1.9 有一些新的东西可能是我需要的:transaction.on_commit
。但是,目前切换到1.9的主要问题是它似乎并不兼容我们项目中的重要依赖Django-Hstore。 1.9 也处于 alpha 阶段,因此即使两者兼容,目前使用起来也不理想。有没有办法在 Django 1.8 中做到这一点?
我也研究过 django_transaction_barrier 并尝试使用它,但没有成功。在tasks.py我把任务改成了
@task(base=TransactionBarrierTask)
def db_ins_task(datapoints, user, description):
from databaseinserter import insertIntoDatabase
insertIntoDatabase(datapoints, user, description)
并且在 views.py 中我更改了任务执行:
with transaction.atomic():
db_ins_task.apply_async_with_barrier(args=(data, user, description,))
但是,我这里的主要问题是,一旦收到任务,Celery 就会抛出关于意外关键字参数的错误:
worker_1 | Traceback (most recent call last):
worker_1 | File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
worker_1 | R = retval = fun(*args, **kwargs)
worker_1 | File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
worker_1 | return self.run(*args, **kwargs)
worker_1 | TypeError: db_ins_task() got an unexpected keyword argument '__transaction_barrier'
那么,最好的方法是什么?我是否应该继续尝试使用 django_transaction_barrier(如果我确实将其用于正确的事情)?如果是这样,我在做什么 wrong/missing 会导致错误?如果没有,从我的数据库中清除无效上传的更好方法是什么?
Celery 是一个异步任务 运行ner,基本上一旦任务被移交给 celery,它的火和忘记。您不能跨流程边界进行交易,因为 celery 将 运行ning 作为一名工人。
您总是可以 运行 另一个任务来查找无效数据点并清理您的数据库。简而言之,您想要一个具有两阶段提交的分布式事务,这不容易实现,因为它有自己的问题并且不确定 Python.
中是否可用
您是否考虑过将 transaction.atomic
语句移动到任务中?或者甚至进入插入函数本身?这些都应该有效。
在我当前使用 Django、Docker-Compose 和 Celery(以及其他东西)的项目中,基本的上传文件功能 insertIntoDatabase
是从任务中调用的,并且在 views.py
使用 delay
.
在databaseinserter.py中:
def insertIntoDatabase(datapoints, user, description): # datapoints is a list of dictionaries, user and description are just strings
# convert data and upload to our database
在tasks.py中:
@app.task()
def db_ins_task(datapoints, user, description):
from databaseinserter import insertIntoDatabase
insertIntoDatabase(datapoints, user, description)
在views.py中:
with transaction.atomic():
db_ins_task.delay(datapoints, user, description)
在将 Celery 引入项目之前,insertIntoDatabase
只是在 views.py
中直接调用,因此不会插入任何无效的数据点列表(即格式不正确的),整个上传将是取消并回滚。但是,现在上传是在异步 celery 任务中,无效的上传不再被正确回滚。既然上传是一项任务,我如何才能确保无效的上传仍然被取消并完全撤消?似乎 Django 1.9 有一些新的东西可能是我需要的:transaction.on_commit
。但是,目前切换到1.9的主要问题是它似乎并不兼容我们项目中的重要依赖Django-Hstore。 1.9 也处于 alpha 阶段,因此即使两者兼容,目前使用起来也不理想。有没有办法在 Django 1.8 中做到这一点?
我也研究过 django_transaction_barrier 并尝试使用它,但没有成功。在tasks.py我把任务改成了
@task(base=TransactionBarrierTask)
def db_ins_task(datapoints, user, description):
from databaseinserter import insertIntoDatabase
insertIntoDatabase(datapoints, user, description)
并且在 views.py 中我更改了任务执行:
with transaction.atomic():
db_ins_task.apply_async_with_barrier(args=(data, user, description,))
但是,我这里的主要问题是,一旦收到任务,Celery 就会抛出关于意外关键字参数的错误:
worker_1 | Traceback (most recent call last):
worker_1 | File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
worker_1 | R = retval = fun(*args, **kwargs)
worker_1 | File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
worker_1 | return self.run(*args, **kwargs)
worker_1 | TypeError: db_ins_task() got an unexpected keyword argument '__transaction_barrier'
那么,最好的方法是什么?我是否应该继续尝试使用 django_transaction_barrier(如果我确实将其用于正确的事情)?如果是这样,我在做什么 wrong/missing 会导致错误?如果没有,从我的数据库中清除无效上传的更好方法是什么?
Celery 是一个异步任务 运行ner,基本上一旦任务被移交给 celery,它的火和忘记。您不能跨流程边界进行交易,因为 celery 将 运行ning 作为一名工人。
您总是可以 运行 另一个任务来查找无效数据点并清理您的数据库。简而言之,您想要一个具有两阶段提交的分布式事务,这不容易实现,因为它有自己的问题并且不确定 Python.
中是否可用您是否考虑过将 transaction.atomic
语句移动到任务中?或者甚至进入插入函数本身?这些都应该有效。