芹菜生产优雅重启

Celery Production Graceful Restart

我需要重新启动 celery 守护进程,但我需要它告诉当前工作人员在任务完成时关闭,然后在旧工作人员仍在关闭时启动一组新工作人员。

守护进程的当前优雅选项等待所有任务完成后再重新启动,这在您有很长的 运行 作业时没有用。

请不要建议自动重新加载,因为它目前在 4.0.2 中没有记录。

好吧,我最终做的是使用 supervisord 和 ansible 来管理它。

[program:celery_worker]
# Max concurrent task you wish to run.
numprocs=5
process_name=%(program_name)s-%(process_num)s
directory=/opt/worker/main
# Ignore this unless you want to use virtualenvs.
environment=PATH="/opt/worker/main/bin:%(ENV_PATH)s"
command=/opt/worker/bin/celery worker -n worker%(process_num)s.%%h --app=python --time-limit=3600 -c 5 -Ofair -l debug --config=celery_config -E
stdout_logfile=/var/log/celery/%(program_name)s-%(process_num)s.log
user=worker_user
autostart=true
autorestart=true
startretries=99999
startsecs=10
stopsignal=TERM
stopwaitsecs=7200
killasgroup=false

您可以使用 supervisor 来 stop/start 工作人员加载新代码,但它会等待所有工作人员停止后再重新启动它们,这对于长时间的 运行 工作来说效果不佳。最好只是终止 MainProcesses,这将告诉工作人员停止接受工作并在完成时关闭。

ps aux | grep *celery.*MainProcess | awk '{print }' | xargs kill -TERM

Supervisor 将在它们死亡时重新启动它们。

当然,在不完全停止所有工作人员的情况下更新依赖项是几乎不可能的,这为使用 docker 这样的东西提供了一个很好的案例。 ;)

在 Celery4 上,我必须修补基本任务 class 才能使其正常工作。 Source

import signal
from celery import Celery, Task
from celery.utils.log import get_task_logger
logger = get_task_logger('my_logger')

class MyBaseTask(Task):
    def __call__(self, *args, **kwargs):
        signal.signal(signal.SIGTERM,
                      lambda signum, frame: logger.info('SIGTERM received, 
                                            wait till the task finished'))
        return super().__call__(*args, **kwargs)

app = Celery('my_app')
app.Task = MyBaseTask

还有一个 patch 可以防止在警告关闭时重新安排

我们的任务可能 运行 长达 48 小时。当我们发布新版本并将新版本部署到生产环境时,您所说的优雅重启非常常见。我们所做的只是将 SIGTERM(关闭)信号发送给 运行ning 工作人员,然后并行启动全新的工作人员集。