使用 --preload 在 dask worker 中初始化任务模块全局?
Initializing task module global in dask worker using --preload?
我试图实现与这些问题类似的东西 (, ),其中我有一个(相对)大的模型,我想在将接受需要的任务的工作人员子集上进行预初始化该模型。理想情况下,我什至不希望客户端机器有模型。
在发现这些问题之前,我最初的尝试是在共享模块 worker_task.model
中定义一个 delayed
任务,并在其中分配一个模块全局变量(例如 worker_tasks.model.model
)工人的 --preload
脚本用于该任务;但是,由于某些原因这不起作用 - 变量在预加载脚本中设置,但在调用任务时仍然 None
。
init_model_worker.py:
import logging
from uuid import uuid4
from worker_tasks import model
def dask_setup(worker):
model.model = f'<mock model {uuid4()}>'
logger = logging.getLogger('distributed')
logger.warning(f'model = {model.model}')
worker_tasks/model.py:
import logging
import random
from time import sleep
from uuid import uuid4
import dask
model = None
@dask.delayed
def compute_clinical(inp):
if model is None:
raise RuntimeError('Model not initialized.')
sleep(random.uniform(3, 17))
return {
'result': random.choice((True, False)),
'confidence': random.uniform(0, 1)
}
这是我启动它并向调度程序提交内容时的工作日志:
> dask-worker --preload init_model_worker.py tcp://scheduler:8786 --name model-worker
distributed.utils - INFO - Reload module init_model_worker from .py file
distributed.nanny - INFO - Start Nanny at: 'tcp://172.28.0.4:41743'
distributed.diskutils - INFO - Found stale lock file and directory '/worker-epptq9sh', purging
distributed.utils - INFO - Reload module init_model_worker from .py file
distributed - WARNING - model = <mock model faa41af0-d925-46ef-91c9-086093d37c71>
distributed.worker - INFO - Start worker at: tcp://172.28.0.4:37973
distributed.worker - INFO - Listening to: tcp://172.28.0.4:37973
distributed.worker - INFO - nanny at: 172.28.0.4:41743
distributed.worker - INFO - bokeh at: 172.28.0.4:37766
distributed.worker - INFO - Waiting to connect to: tcp://scheduler:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 4
distributed.worker - INFO - Memory: 1.93 GB
distributed.worker - INFO - Local Directory: /worker-mhozo9ru
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://scheduler:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - WARNING - Compute Failed
Function: compute_clinical
args: ('mock')
kwargs: {}
Exception: RuntimeError('Model not initialized.')
可以看到重新加载preload脚本后,model
为<mock model faa41af0-d925-46ef-91c9-086093d37c71>
;但是当我尝试从任务中调用它时,我得到 None
.
我将尝试根据其他问题的答案实施解决方案,但我有几个与工作器预加载相关的问题:
- 为什么我在预加载脚本中分配模型后调用任务时模型
None
?
- 通常建议避免在 worker
--preload
脚本中做这样的事情吗?从客户端调用 worker 状态的初始化是否更好? 如果是,为什么?
我怀疑模型变量是通过 Python 序列化函数立即绑定到您的函数中的。您可以试试这个:
@dask.delayed
def compute_clinical(inp):
from worker_tasks.model import model
if model is None:
raise RuntimeError('Model not initialized.')
或者,与其将变量分配给全局模块范围(这在 Python 中可能很难理解),不如尝试将其分配给 worker 本身。
from dask.distributed import get_worker
def dask_setup(worker):
worker.model = f'<mock model {uuid4()}>'
@dask.delayed
def compute_clinical(inp):
if get_worker().model is None:
raise RuntimeError('Model not initialized.')
我试图实现与这些问题类似的东西 (
在发现这些问题之前,我最初的尝试是在共享模块 worker_task.model
中定义一个 delayed
任务,并在其中分配一个模块全局变量(例如 worker_tasks.model.model
)工人的 --preload
脚本用于该任务;但是,由于某些原因这不起作用 - 变量在预加载脚本中设置,但在调用任务时仍然 None
。
init_model_worker.py:
import logging
from uuid import uuid4
from worker_tasks import model
def dask_setup(worker):
model.model = f'<mock model {uuid4()}>'
logger = logging.getLogger('distributed')
logger.warning(f'model = {model.model}')
worker_tasks/model.py:
import logging
import random
from time import sleep
from uuid import uuid4
import dask
model = None
@dask.delayed
def compute_clinical(inp):
if model is None:
raise RuntimeError('Model not initialized.')
sleep(random.uniform(3, 17))
return {
'result': random.choice((True, False)),
'confidence': random.uniform(0, 1)
}
这是我启动它并向调度程序提交内容时的工作日志:
> dask-worker --preload init_model_worker.py tcp://scheduler:8786 --name model-worker
distributed.utils - INFO - Reload module init_model_worker from .py file
distributed.nanny - INFO - Start Nanny at: 'tcp://172.28.0.4:41743'
distributed.diskutils - INFO - Found stale lock file and directory '/worker-epptq9sh', purging
distributed.utils - INFO - Reload module init_model_worker from .py file
distributed - WARNING - model = <mock model faa41af0-d925-46ef-91c9-086093d37c71>
distributed.worker - INFO - Start worker at: tcp://172.28.0.4:37973
distributed.worker - INFO - Listening to: tcp://172.28.0.4:37973
distributed.worker - INFO - nanny at: 172.28.0.4:41743
distributed.worker - INFO - bokeh at: 172.28.0.4:37766
distributed.worker - INFO - Waiting to connect to: tcp://scheduler:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 4
distributed.worker - INFO - Memory: 1.93 GB
distributed.worker - INFO - Local Directory: /worker-mhozo9ru
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://scheduler:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - WARNING - Compute Failed
Function: compute_clinical
args: ('mock')
kwargs: {}
Exception: RuntimeError('Model not initialized.')
可以看到重新加载preload脚本后,model
为<mock model faa41af0-d925-46ef-91c9-086093d37c71>
;但是当我尝试从任务中调用它时,我得到 None
.
我将尝试根据其他问题的答案实施解决方案,但我有几个与工作器预加载相关的问题:
- 为什么我在预加载脚本中分配模型后调用任务时模型
None
? - 通常建议避免在 worker
--preload
脚本中做这样的事情吗?从客户端调用 worker 状态的初始化是否更好? 如果是,为什么?
我怀疑模型变量是通过 Python 序列化函数立即绑定到您的函数中的。您可以试试这个:
@dask.delayed
def compute_clinical(inp):
from worker_tasks.model import model
if model is None:
raise RuntimeError('Model not initialized.')
或者,与其将变量分配给全局模块范围(这在 Python 中可能很难理解),不如尝试将其分配给 worker 本身。
from dask.distributed import get_worker
def dask_setup(worker):
worker.model = f'<mock model {uuid4()}>'
@dask.delayed
def compute_clinical(inp):
if get_worker().model is None:
raise RuntimeError('Model not initialized.')