如何在工作人员关闭时向 运行 任务发送消息?
How to send a message to a running task on worker shutdown?
我有一个任务可能需要几个小时才能完成。但是,可以通过在 memcached 中设置一个标志来指示它尽快干净地退出。任务定期轮询标志,当发现时,任务保存其状态并干净地退出。如果我只是手动在缓存中设置标志,这一切都可以正常工作。
我想用它来及时关闭 worker,我只需要知道将标志设置逻辑放在哪里。这是我尝试过的:
worker_shutdown
处理程序
我试着把它放在 worker_shutdown
处理程序中,但它看起来只在 所有任务完成后 才被调用,所以它无助于关闭我的long-运行 task.
信号处理程序通过 celery.platforms.signals
我尝试使用 celery.platforms.signals
接口附加 TERM 和 INT 信号处理程序,但从未调用过我的处理程序:
@worker_init.connect
def handle_worker_init(*args, **kwargs):
platforms.signals["TERM"] = handle_worker_shutdown
platforms.signals["INT"] = handle_worker_shutdown
def handle_worker_shutdown(*args, **kwargs):
logger.info("Setting database cleanup stop flag to save our ship")
database_cleanup_stop()
我也在 worker_process_init
中尝试过这样做,但我的处理程序仍然没有被调用。 (即使调用它也会有点令人讨厌,因为每个工作进程都可能会设置标志,但我只需要设置一次。无害,但令人讨厌。)
是否有任何其他信号可以设置我的 "early-exit" 标志?或解决此问题的任何其他建议?
使用worker_shutting_down信号:
from celery.signals import worker_shutting_down
@worker_shutting_down.connect
def worker_shutting_down_handler(sig, how, exitcode, ** kwargs):
print(f 'worker_shutting_down({sig}, {how}, {exitcode})')
当工作人员开始关闭进程时分派。就像一个魅力...
我有一个任务可能需要几个小时才能完成。但是,可以通过在 memcached 中设置一个标志来指示它尽快干净地退出。任务定期轮询标志,当发现时,任务保存其状态并干净地退出。如果我只是手动在缓存中设置标志,这一切都可以正常工作。
我想用它来及时关闭 worker,我只需要知道将标志设置逻辑放在哪里。这是我尝试过的:
worker_shutdown
处理程序
我试着把它放在 worker_shutdown
处理程序中,但它看起来只在 所有任务完成后 才被调用,所以它无助于关闭我的long-运行 task.
信号处理程序通过 celery.platforms.signals
我尝试使用 celery.platforms.signals
接口附加 TERM 和 INT 信号处理程序,但从未调用过我的处理程序:
@worker_init.connect
def handle_worker_init(*args, **kwargs):
platforms.signals["TERM"] = handle_worker_shutdown
platforms.signals["INT"] = handle_worker_shutdown
def handle_worker_shutdown(*args, **kwargs):
logger.info("Setting database cleanup stop flag to save our ship")
database_cleanup_stop()
我也在 worker_process_init
中尝试过这样做,但我的处理程序仍然没有被调用。 (即使调用它也会有点令人讨厌,因为每个工作进程都可能会设置标志,但我只需要设置一次。无害,但令人讨厌。)
是否有任何其他信号可以设置我的 "early-exit" 标志?或解决此问题的任何其他建议?
使用worker_shutting_down信号:
from celery.signals import worker_shutting_down
@worker_shutting_down.connect
def worker_shutting_down_handler(sig, how, exitcode, ** kwargs):
print(f 'worker_shutting_down({sig}, {how}, {exitcode})')
当工作人员开始关闭进程时分派。就像一个魅力...