在 celery 作业和 on_success 函数中清除了 SQLAlchemy 会话
SQLAlchemy session cleared in celery job and on_success function
我正在构建一个工具,从不同的数据库中获取数据,运行对其进行格式化,并将其存储在我自己的数据库中。我正在从 APScheduler 迁移到 Celery,但我 运行 遇到了以下问题:
我用一个class我调用JobRecords
来存储作业运行时,是否成功以及遇到了哪些错误。我用它来知道更新的条目不要回头太远,尤其是因为有些表有数百万行。
由于所有作业的系统都相同,我从 celery Task
对象创建了一个 subclass。我确保该作业在 Flask 应用程序上下文中执行,并且我获取该作业成功完成的最新时间。我还确保我为 now
注册了一个值,以避免查询数据库和添加工作记录之间的时间问题。
class RecordedTask(Task):
"""
Task sublass that uses JobRecords to get the last run date
and add new JobRecords on completion
"""
now: datetime = None
ignore_result = True
_session: scoped_session = None
success: bool = True
info: dict = None
@property
def session(self) -> Session:
"""Making sure we have one global session instance"""
if self._session is None:
from app.extensions import db
self._session = db.session
return self._session
def __call__(self, *args, **kwargs):
from app.models import JobRecord
kwargs['last_run'] = (
self.session.query(func.max(JobRecord.run_at_))
.filter(JobRecord.job_id == self.name, JobRecord.success)
.first()
)[0] or datetime.min
self.now = kwargs['now'] = datetime.utcnow()
with app.app_context():
super(RecordedTask, self).__call__(*args, **kwargs)
def on_failure(self, exc, task_id, args: list, kwargs: dict, einfo):
self.session.rollback()
self.success = False
self.info = dict(
args=args,
kwargs=kwargs,
error=exc.args,
exc=format_exception(exc.__class__, exc, exc.__traceback__),
)
app.logger.error(f"Error executing job '{self.name}': {exc}")
def on_success(self, retval, task_id, args: list, kwargs: dict):
app.logger.info(f"Executed job '{self.name}' successfully, adding JobRecord")
for entry in self.to_trigger:
if len(entry) == 2:
job, kwargs = entry
else:
job, = entry
kwargs = {}
app.logger.info(f"Scheduling job '{job}'")
current_celery_app.signature(job, **kwargs).delay()
def after_return(self, *args, **kwargs):
from app.models import JobRecord
record = JobRecord(
job_id=self.name,
run_at_=self.now,
info=self.info,
success=self.success
)
self.session.add(record)
self.session.commit()
self.session.remove()
我添加了一个工作示例来更新一个名为 Location
的模型,但是像这个工作一样有很多工作。
@celery.task(bind=True, name="update_locations")
def update_locations(self, last_run: datetime = datetime.min, **_):
"""Get the locations from the external database and check for updates"""
locations: List[ExternalLocation] = ExternalLocation.query.filter(
ExternalLocation.updated_at_ >= last_run
).order_by(ExternalLocation.id).all()
app.logger.info(f"ExternalLocation: collected {len(locations)} updated locations")
for update_location in locations:
existing_location: Location = Location.query.filter(
Location.external_id == update_location.id
).first()
if existing_location is None:
self.session.add(Location.from_worker(update_location))
else:
existing_location.update_from_worker(update_location)
问题是当我 运行 这个作业时, Location
对象没有与 JobRecord
一起提交,所以只创建了后者。如果我用调试器跟踪它,Location.query.count()
returns 函数内部的正确值,但是一旦它进入 on_success
回调,它就回到 0,并且 self._session.new
returns 一个空字典。
我已经尝试将会话添加为 属性 以确保它在任何地方都是相同的实例,但问题仍然存在。也许它与 scoped_session
有关,因为 Flask-SQLAlchemy
?
抱歉,代码量太大,我确实尝试了尽可能多的剥离。欢迎任何帮助!
我发现罪魁祸首是 scoped_session
和 Flask 应用上下文的组合。与任何上下文管理器一样,运行 代码 with app.app_context()
在离开时触发了 __exit__
函数,这反过来导致存储 scoped_session
的 ScopedRegistry
成为清除。然后,创建了一个新会话,将 JobRecords
添加到其中,并提交了该会话。因此,位置不会写入数据库。
有两种可能的解决方案。如果您不在任务以外的其他文件中使用会话,则可以向任务添加一个会话 属性。这样,您就可以完全避免 scoped_session
,并且可以在 after_return
函数中进行清理。
@property
def session(self):
if self._session is None:
from dashboard.extensions import db
self._session = db.create_session(options={})()
return self._session
但是,我也通过 from extensions import db
访问了模型定义文件中的会话。因此,我使用了两个不同的会话。我最终使用 app.app_context().push()
而不是上下文管理器,从而避免了 __exit__
函数
app.app_context().push()
super(RecordedTask, self).__call__(*args, **kwargs)
我正在构建一个工具,从不同的数据库中获取数据,运行对其进行格式化,并将其存储在我自己的数据库中。我正在从 APScheduler 迁移到 Celery,但我 运行 遇到了以下问题:
我用一个class我调用JobRecords
来存储作业运行时,是否成功以及遇到了哪些错误。我用它来知道更新的条目不要回头太远,尤其是因为有些表有数百万行。
由于所有作业的系统都相同,我从 celery Task
对象创建了一个 subclass。我确保该作业在 Flask 应用程序上下文中执行,并且我获取该作业成功完成的最新时间。我还确保我为 now
注册了一个值,以避免查询数据库和添加工作记录之间的时间问题。
class RecordedTask(Task):
"""
Task sublass that uses JobRecords to get the last run date
and add new JobRecords on completion
"""
now: datetime = None
ignore_result = True
_session: scoped_session = None
success: bool = True
info: dict = None
@property
def session(self) -> Session:
"""Making sure we have one global session instance"""
if self._session is None:
from app.extensions import db
self._session = db.session
return self._session
def __call__(self, *args, **kwargs):
from app.models import JobRecord
kwargs['last_run'] = (
self.session.query(func.max(JobRecord.run_at_))
.filter(JobRecord.job_id == self.name, JobRecord.success)
.first()
)[0] or datetime.min
self.now = kwargs['now'] = datetime.utcnow()
with app.app_context():
super(RecordedTask, self).__call__(*args, **kwargs)
def on_failure(self, exc, task_id, args: list, kwargs: dict, einfo):
self.session.rollback()
self.success = False
self.info = dict(
args=args,
kwargs=kwargs,
error=exc.args,
exc=format_exception(exc.__class__, exc, exc.__traceback__),
)
app.logger.error(f"Error executing job '{self.name}': {exc}")
def on_success(self, retval, task_id, args: list, kwargs: dict):
app.logger.info(f"Executed job '{self.name}' successfully, adding JobRecord")
for entry in self.to_trigger:
if len(entry) == 2:
job, kwargs = entry
else:
job, = entry
kwargs = {}
app.logger.info(f"Scheduling job '{job}'")
current_celery_app.signature(job, **kwargs).delay()
def after_return(self, *args, **kwargs):
from app.models import JobRecord
record = JobRecord(
job_id=self.name,
run_at_=self.now,
info=self.info,
success=self.success
)
self.session.add(record)
self.session.commit()
self.session.remove()
我添加了一个工作示例来更新一个名为 Location
的模型,但是像这个工作一样有很多工作。
@celery.task(bind=True, name="update_locations")
def update_locations(self, last_run: datetime = datetime.min, **_):
"""Get the locations from the external database and check for updates"""
locations: List[ExternalLocation] = ExternalLocation.query.filter(
ExternalLocation.updated_at_ >= last_run
).order_by(ExternalLocation.id).all()
app.logger.info(f"ExternalLocation: collected {len(locations)} updated locations")
for update_location in locations:
existing_location: Location = Location.query.filter(
Location.external_id == update_location.id
).first()
if existing_location is None:
self.session.add(Location.from_worker(update_location))
else:
existing_location.update_from_worker(update_location)
问题是当我 运行 这个作业时, Location
对象没有与 JobRecord
一起提交,所以只创建了后者。如果我用调试器跟踪它,Location.query.count()
returns 函数内部的正确值,但是一旦它进入 on_success
回调,它就回到 0,并且 self._session.new
returns 一个空字典。
我已经尝试将会话添加为 属性 以确保它在任何地方都是相同的实例,但问题仍然存在。也许它与 scoped_session
有关,因为 Flask-SQLAlchemy
?
抱歉,代码量太大,我确实尝试了尽可能多的剥离。欢迎任何帮助!
我发现罪魁祸首是 scoped_session
和 Flask 应用上下文的组合。与任何上下文管理器一样,运行 代码 with app.app_context()
在离开时触发了 __exit__
函数,这反过来导致存储 scoped_session
的 ScopedRegistry
成为清除。然后,创建了一个新会话,将 JobRecords
添加到其中,并提交了该会话。因此,位置不会写入数据库。
有两种可能的解决方案。如果您不在任务以外的其他文件中使用会话,则可以向任务添加一个会话 属性。这样,您就可以完全避免 scoped_session
,并且可以在 after_return
函数中进行清理。
@property
def session(self):
if self._session is None:
from dashboard.extensions import db
self._session = db.create_session(options={})()
return self._session
但是,我也通过 from extensions import db
访问了模型定义文件中的会话。因此,我使用了两个不同的会话。我最终使用 app.app_context().push()
而不是上下文管理器,从而避免了 __exit__
函数
app.app_context().push()
super(RecordedTask, self).__call__(*args, **kwargs)