如何在工作人员关闭时向 运行 任务发送消息?

How to send a message to a running task on worker shutdown?

我有一个任务可能需要几个小时才能完成。但是,可以通过在 memcached 中设置一个标志来指示它尽快干净地退出。任务定期轮询标志,当发现时,任务保存其状态并干净地退出。如果我只是手动在缓存中设置标志,这一切都可以正常工作。

我想用它来及时关闭 worker,我只需要知道将标志设置逻辑放在哪里。这是我尝试过的:

worker_shutdown 处理程序

我试着把它放在 worker_shutdown 处理程序中,但它看起来只在 所有任务完成后 才被调用,所以它无助于关闭我的long-运行 task.

信号处理程序通过 celery.platforms.signals

我尝试使用 celery.platforms.signals 接口附加 TERM 和 INT 信号处理程序,但从未调用过我的处理程序:

@worker_init.connect
def handle_worker_init(*args, **kwargs):
    platforms.signals["TERM"] = handle_worker_shutdown
    platforms.signals["INT"] = handle_worker_shutdown


def handle_worker_shutdown(*args, **kwargs):
    logger.info("Setting database cleanup stop flag to save our ship")
    database_cleanup_stop()

我也在 worker_process_init 中尝试过这样做,但我的处理程序仍然没有被调用。 (即使调用它也会有点令人讨厌,因为每个工作进程都可能会设置标志,但我只需要设置一次。无害,但令人讨厌。)


是否有任何其他信号可以设置我的 "early-exit" 标志?或解决此问题的任何其他建议?

使用worker_shutting_down信号:

from celery.signals import worker_shutting_down

@worker_shutting_down.connect
def worker_shutting_down_handler(sig, how, exitcode, ** kwargs):
    print(f 'worker_shutting_down({sig}, {how}, {exitcode})')

当工作人员开始关闭进程时分派。就像一个魅力...