SQLAlchemy returns 芹菜任务中的陈旧数据
SQLAlchemy returns stale data in celery tasks
我当前的设置包括
Flask、Flask-SQLAlchemy、芹菜
现在我 运行 遇到的问题是,有时 celery 任务中的数据库查询会提供陈旧数据,即如果我从 table 中请求最后一条记录,我会在插入最后一条记录时得到倒数第二个在执行查询之前仅 10-15 分钟,有时我会看到这样的异常。
OperationalError("(OperationalError) (2006, \'MySQL server has gone away\')",)'
以下是引用
File "/home/sys_user/repo/my_app/app/tasks/reminders.py", line 63, in run
config = Reminder.query.filter_by(id=reminder_id).first()
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2341, in first
ret = list(self[0:1])
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2208, in __getitem__
return list(res)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2412, in __iter__
return self._execute_and_instances(context)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2427, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 729, in execute
return meth(self, multiparams, params)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 321, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 826, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 958, in _execute_context
context)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1160, in _handle_dbapi_exception
exc_info
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 951, in _execute_context
context)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 436, in do_execute
cursor.execute(statement, parameters)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/MySQLdb/cursors.py", line 205, in execute
self.errorhandler(self, exc, value)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
raise errorclass, errorvalue
以下是我的celery_app.py
from app import app as flask_app, db
from celery.signals import worker_process_init
def make_celery(app=None):
celery = Celery()
celery.conf.update(flask_app.config)
celery.config_from_object(celeryconfig)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with flask_app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
db.init_app(flask_app)
celery.Task = ContextTask
return celery
celery_instance = make_celery()
@worker_process_init.connect
def celery_worker_init_db(**_):
db.init_app(flask_app)
定义的一些任务。
class ReminderTask(Task):
ignore_result = True
def run(self, data):
a = Reminder.query.filter_by(id=int(data['reminder_id'])).first()
a.send_reminders()
同样的问题在 sqlalchemy google group
得到了回答
我当前的设置包括
Flask、Flask-SQLAlchemy、芹菜
现在我 运行 遇到的问题是,有时 celery 任务中的数据库查询会提供陈旧数据,即如果我从 table 中请求最后一条记录,我会在插入最后一条记录时得到倒数第二个在执行查询之前仅 10-15 分钟,有时我会看到这样的异常。
OperationalError("(OperationalError) (2006, \'MySQL server has gone away\')",)'
以下是引用
File "/home/sys_user/repo/my_app/app/tasks/reminders.py", line 63, in run
config = Reminder.query.filter_by(id=reminder_id).first()
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2341, in first
ret = list(self[0:1])
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2208, in __getitem__
return list(res)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2412, in __iter__
return self._execute_and_instances(context)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2427, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 729, in execute
return meth(self, multiparams, params)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 321, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 826, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 958, in _execute_context
context)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1160, in _handle_dbapi_exception
exc_info
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 951, in _execute_context
context)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 436, in do_execute
cursor.execute(statement, parameters)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/MySQLdb/cursors.py", line 205, in execute
self.errorhandler(self, exc, value)
File "/home/sys_user/Envs/flask/local/lib/python2.7/site-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
raise errorclass, errorvalue
以下是我的celery_app.py
from app import app as flask_app, db
from celery.signals import worker_process_init
def make_celery(app=None):
celery = Celery()
celery.conf.update(flask_app.config)
celery.config_from_object(celeryconfig)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with flask_app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
db.init_app(flask_app)
celery.Task = ContextTask
return celery
celery_instance = make_celery()
@worker_process_init.connect
def celery_worker_init_db(**_):
db.init_app(flask_app)
定义的一些任务。
class ReminderTask(Task):
ignore_result = True
def run(self, data):
a = Reminder.query.filter_by(id=int(data['reminder_id'])).first()
a.send_reminders()
同样的问题在 sqlalchemy google group
得到了回答