如何使用工人初始化时使用的变量启动芹菜工人
How to start celery worker with variable used on worker initialization
是否可以选择在启动时将变量传递给 celery worker 并在执行时在 worker 内部使用它?
我正在编写服务器,将负责机器学习训练和评估。我想动态启动 worker 的新实例并将变量传递给它将用于在内部加载特定模型。
我找到了如何使用 worker_main
方法从答案 here 开始工作。
我在考虑两种解决方案:
设置为环境变量。此解决方案的问题在于,当同时创建两个 worker 实例时,它可能会被破坏。
将其作为 argv 传递,但我不知道如何读取 worker 中的变量。
编辑
我找到了 线程,但它只讨论在任务中访问自定义参数。我的问题是关于在 worker 初始化时访问它。
受 this 线程的启发,我将尝试使用 celery 信号。
http://docs.celeryproject.org/en/latest/userguide/signals.html#worker-init
当子进程为 forked/spawned 时,环境变量从父进程复制到子进程。这意味着该进程可以操纵自己的变量,但其他进程不会(这是可能的,但例外 - 阅读此线程了解一些背景知识:Is there a way to change the environment variables of another process in Unix?)
如果担心您自己的代码中存在竞争条件,您应该考虑对更改父项 os.environ
并生成工作程序的部分进行锁定。在 worker 作为一个单独的进程产生后,释放锁,你就不用担心通过修改父进程的环境来破坏子进程了。
也许我的问题不够准确,但我自己通过文档和 Whosebug 线程找到了答案。
我想 运行 为 Keras 模型分离 worker。在 worker 初始化中,我需要将模型加载到内存中,在任务中,模型用于预测。
我的解决方案:
- 用 model_id 命名工人(因为 id 是唯一的,我每个模型只需要一个工人)
- 在celeryd_after_setup信号函数中我解析了名称并在worker
中设置了全局变量
- 在 worker_process_init 信号函数上,我在我的案例中加载了模型,它是 Grasper 中的静态字段 class
- 在任务中我使用了 Grasper 的静态字段 class
下面是一些准确描述解决方案的代码。
from celery.signals import worker_process_init, celeryd_after_setup
from celery.concurrency import asynpool
# my custom class containing static fields for model and tokenizer
# it also can be global variable as model_id
from myapp.ml import Grasper
# set to have some time for model loading otherwise worker_process_init can terminate
asynpool.PROC_ALIVE_TIMEOUT = 100.0
model_id = None
@celeryd_after_setup.connect()
def set_model_id(sender, instance, **kwargs):
global model_id
model_id = instance.hostname.split('@')[1]
@worker_process_init.connect()
def configure_worker(signal=None, sender=None, **kwargs):
Grasper.load_model(model_id)
然后在 celery 任务中,您可以使用 Grasper class 加载模型。
该解决方案有效,但我知道还有改进的地方,所以如果您有任何想法,请发表评论。
是否可以选择在启动时将变量传递给 celery worker 并在执行时在 worker 内部使用它?
我正在编写服务器,将负责机器学习训练和评估。我想动态启动 worker 的新实例并将变量传递给它将用于在内部加载特定模型。
我找到了如何使用 worker_main
方法从答案 here 开始工作。
我在考虑两种解决方案:
设置为环境变量。此解决方案的问题在于,当同时创建两个 worker 实例时,它可能会被破坏。
将其作为 argv 传递,但我不知道如何读取 worker 中的变量。
编辑
我找到了
受 this 线程的启发,我将尝试使用 celery 信号。 http://docs.celeryproject.org/en/latest/userguide/signals.html#worker-init
当子进程为 forked/spawned 时,环境变量从父进程复制到子进程。这意味着该进程可以操纵自己的变量,但其他进程不会(这是可能的,但例外 - 阅读此线程了解一些背景知识:Is there a way to change the environment variables of another process in Unix?)
如果担心您自己的代码中存在竞争条件,您应该考虑对更改父项 os.environ
并生成工作程序的部分进行锁定。在 worker 作为一个单独的进程产生后,释放锁,你就不用担心通过修改父进程的环境来破坏子进程了。
也许我的问题不够准确,但我自己通过文档和 Whosebug 线程找到了答案。
我想 运行 为 Keras 模型分离 worker。在 worker 初始化中,我需要将模型加载到内存中,在任务中,模型用于预测。
我的解决方案:
- 用 model_id 命名工人(因为 id 是唯一的,我每个模型只需要一个工人)
- 在celeryd_after_setup信号函数中我解析了名称并在worker 中设置了全局变量
- 在 worker_process_init 信号函数上,我在我的案例中加载了模型,它是 Grasper 中的静态字段 class
- 在任务中我使用了 Grasper 的静态字段 class
下面是一些准确描述解决方案的代码。
from celery.signals import worker_process_init, celeryd_after_setup
from celery.concurrency import asynpool
# my custom class containing static fields for model and tokenizer
# it also can be global variable as model_id
from myapp.ml import Grasper
# set to have some time for model loading otherwise worker_process_init can terminate
asynpool.PROC_ALIVE_TIMEOUT = 100.0
model_id = None
@celeryd_after_setup.connect()
def set_model_id(sender, instance, **kwargs):
global model_id
model_id = instance.hostname.split('@')[1]
@worker_process_init.connect()
def configure_worker(signal=None, sender=None, **kwargs):
Grasper.load_model(model_id)
然后在 celery 任务中,您可以使用 Grasper class 加载模型。 该解决方案有效,但我知道还有改进的地方,所以如果您有任何想法,请发表评论。