使用 Flask 和 SQLAlchemy 的 Celery 任务中的数据库未更新
Database is not updated in Celery task with Flask and SQLAlchemy
我正在使用 Flask 和 SQLAlchemy 编写 Web 应用程序。我的程序需要在后台处理一些东西,然后在数据库中将这些东西标记为已处理。使用 standard Flask/Celery example,我有这样的东西:
from flask import Flask
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
celery = make_celery(app)
class Stuff(db.Model):
id = db.Column(db.Integer, primary_key=True)
processed = db.Column(db.Boolean)
@celery.task()
def process_stuff(stuff):
# process stuff here
stuff.processed = True
db.session.commit()
@app.route("/process_stuff/<id>")
def do_process_stuff(id):
stuff = Stuff.query.get_or_404(id)
process_stuff.delay(stuff)
return redirect(url_for("now_wait"))
我可以从 process_stuff
访问我的数据库(例如提交像 Stuff.query.get(some_id)
工作这样的查询),但是 db.session.commit()
什么都不做:我的 stuff
记录没有更新。根据 Celery worker 日志,发生了提交,但数据库没有任何变化。我的 db.session.commit()
有问题吗?是否有可能以某种方式做出这样的承诺?
好的,我知道了。 stuff
传递给 process_stuff()
未附加到 db.session
。我必须在 process_stuff()
中明确请求才能获得正确的 stuff
对象,如下所示:
@celery.task()
def process_stuff(stuff):
# process stuff here
my_stuff = Stuff.query.get(stuff.id)
my_stuff.processed = True
db.session.commit()
现在可以了。
我正在使用 Flask 和 SQLAlchemy 编写 Web 应用程序。我的程序需要在后台处理一些东西,然后在数据库中将这些东西标记为已处理。使用 standard Flask/Celery example,我有这样的东西:
from flask import Flask
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
celery = make_celery(app)
class Stuff(db.Model):
id = db.Column(db.Integer, primary_key=True)
processed = db.Column(db.Boolean)
@celery.task()
def process_stuff(stuff):
# process stuff here
stuff.processed = True
db.session.commit()
@app.route("/process_stuff/<id>")
def do_process_stuff(id):
stuff = Stuff.query.get_or_404(id)
process_stuff.delay(stuff)
return redirect(url_for("now_wait"))
我可以从 process_stuff
访问我的数据库(例如提交像 Stuff.query.get(some_id)
工作这样的查询),但是 db.session.commit()
什么都不做:我的 stuff
记录没有更新。根据 Celery worker 日志,发生了提交,但数据库没有任何变化。我的 db.session.commit()
有问题吗?是否有可能以某种方式做出这样的承诺?
好的,我知道了。 stuff
传递给 process_stuff()
未附加到 db.session
。我必须在 process_stuff()
中明确请求才能获得正确的 stuff
对象,如下所示:
@celery.task()
def process_stuff(stuff):
# process stuff here
my_stuff = Stuff.query.get(stuff.id)
my_stuff.processed = True
db.session.commit()
现在可以了。