检查 celery warm shutdown 是否正在进行任务

Check if celery warm shutdown is in progress from a task

TL;DR

有没有办法判断我们的 celery worker 是否进入热关机状态?换句话说,我可以检查是否有 SIGTERM 未决吗?我有一个重新安排自己的任务,但我想避免在有等待关闭的情况下重新安排自己,以避免延迟热关闭。像这样:

if not self.shutdown_pending():
    self.retry(countdown=5, max_retries=3)

实际上,除了重新安排事情之外,我希望能够在获得 SIGTERM 后立即完全摆脱当前的工作,以便我可以尽快重新启动我的工作人员代码部署:

@app.task(bind=True)
def my_work_task(self):
    work = get_work()
    for item in work:
        if self.shutdown_pending():
            logger.info("Shutdown detected. Bailing.")
            return
        item.process()

背景

我有一个任务需要花费不同的时间(从几秒到几分钟不等)。我正在使用一个分钟的 celery-beat 时间表来最初调用任务,但是如果我只需要少量的工作,比如说,十秒就可以完成,那么我想立即重新调用任务次以避免等待 50 秒等待下一个 celery beat 进入,因为新工作很可能会在该时间段内可用。

所有这一切都是为了尽量减少处理我的工作项目的延迟。我想避免工人坐在那里无所事事的 50 秒时间段,因为在那段时间可能有一些工作可用。请注意,工作变成 "ready" 基于数据库中的 "expiration" 个项目,这就是为什么我使用 celery beat 只是在它们可用时将它们清理干净,而不是直接触发任务。

我的任务看起来像这样:

@app.task(bind=True)
def my_work_task(self):
    work = get_work()
    do_some_work(work)
    # if this was just a short bit of work reschedule ourselves
    # immediately to avoid wasting time waiting for the
    # next celery beat.
    if len(work) < SMALL_WORK_THRESHOLD:
        self.retry(countdown=5, max_retries=3)

这一切都很好,除了一件事:当我重新加载我的工人时(通过发送 SIGTERM),我最终可能会等待一个工人重新安排自己,每次可能都有大量的工作。每次调用可能需要几分钟,直到我达到我的 max_retries 值。这使得部署新代码成为一个问题,因为工作处理几乎会停止长达几分钟。

遗憾的是,没有其他类似 中提到的简单解决方案。

你唯一能做的就是改变方法或使用 SIGKILL,在这种情况下,只需确保使用任务结果后端来了解你可能丢失了哪些任务,或者你是否保留您可能不需要的数据库状态。

根据我的个人经验,我总是使用 mongodb 来注册任务的星星和结束。这让我可以看到由于机器崩溃而从未完成的任务(我使用的是 CELERY_ACKS_LATE),如果我想让某个任务在整个云上一次 运行 一次,还可以执行全局锁定. 这样,如果 SIGTERM 在一定时间后不工作,我发送 SIGKILL 没有丢失工作的风险。

希望对您有所帮助

实际上我遇到了同样的问题并提出了解决方案:

@worker_ready.connect
def my_long_running_task(signal, sender, **kwargs):
    is_running = True

    def get_shutdown_signal(**kwargs):
     # here is the magic with nonelocal keyword
        nonlocal is_running
        is_running = False

    worker_shutting_down.connect(get_shutdown_signal)
    while is_running:
         # do stuff