如何使用工人初始化时使用的变量启动芹菜工人

How to start celery worker with variable used on worker initialization

是否可以选择在启动时将变量传递给 celery worker 并在执行时在 worker 内部使用它?

我正在编写服务器,将负责机器学习训练和评估。我想动态启动 worker 的新实例并将变量传递给它将用于在内部加载特定模型。

我找到了如何使用 worker_main 方法从答案 here 开始工作。

我在考虑两种解决方案:

  1. 设置为环境变量。此解决方案的问题在于,当同时创建两个 worker 实例时,它可能会被破坏。

  2. 将其作为 argv 传递,但我不知道如何读取 worker 中的变量。


编辑

我找到了 线程,但它只讨论在任务中访问自定义参数。我的问题是关于在 worker 初始化时访问它。

this 线程的启发,我将尝试使用 celery 信号。 http://docs.celeryproject.org/en/latest/userguide/signals.html#worker-init

当子进程为 forked/spawned 时,环境变量从父进程复制到子进程。这意味着该进程可以操纵自己的变量,但其他进程不会(这是可能的,但例外 - 阅读此线程了解一些背景知识:Is there a way to change the environment variables of another process in Unix?

如果担心您自己的代码中存在竞争条件,您应该考虑对更改父项 os.environ 并生成工作程序的部分进行锁定。在 worker 作为一个单独的进程产生后,释放锁,你就不用担心通过修改父进程的环境来破坏子进程了。

也许我的问题不够准确,但我自己通过文档和 Whosebug 线程找到了答案。

我想 运行 为 Keras 模型分离 worker。在 worker 初始化中,我需要将模型加载到内存中,在任务中,模型用于预测。

我的解决方案:

  1. model_id 命名工人(因为 id 是唯一的,我每个模型只需要一个工人)
  2. celeryd_after_setup信号函数中我解析了名称并在worker
  3. 中设置了全局变量
  4. worker_process_init 信号函数上,我在我的案例中加载了模型,它是 Grasper 中的静态字段 class
  5. 在任务中我使用了 Grasper 的静态字段 class

下面是一些准确描述解决方案的代码。

from celery.signals import worker_process_init, celeryd_after_setup
from celery.concurrency import asynpool

# my custom class containing static fields for model and tokenizer
# it also can be global variable as model_id
from myapp.ml import Grasper

# set to have some time for model loading otherwise worker_process_init can terminate
asynpool.PROC_ALIVE_TIMEOUT = 100.0
model_id = None

@celeryd_after_setup.connect()
def set_model_id(sender, instance, **kwargs):
    global model_id
    model_id = instance.hostname.split('@')[1]

@worker_process_init.connect()
def configure_worker(signal=None, sender=None, **kwargs):
    Grasper.load_model(model_id)

然后在 celery 任务中,您可以使用 Grasper class 加载模型。 该解决方案有效,但我知道还有改进的地方,所以如果您有任何想法,请发表评论。