如何正确使用 dask 的 upload_file() 将本地代码传递给 worker

How to properly use dask's upload_file() to pass local code to workers

我在 local_code.py 文件中有函数,我想通过 dask 传递给工作人员。我在这里看到问题的答案说这可以使用 upload_file() 函数来完成,但我似乎无法让它工作,因为我仍然得到 ModuleNotFoundError.

相关部分代码如下

from dask.distributed import Client
from dask_jobqueue import SLURMCluster

from local_code import *
helper_file = '/absolute/path/to/local_code.py'

def main():
    with SLURMCluster(**slurm_params) as cluster:

        cluster.scale(n_workers)

        with Client(cluster) as client:
            client.upload_file(helper_file)
            mapping = client.map(myfunc, data)
            client.gather(mapping)

if __name__ == '__main__':
    main()

请注意,myfunc 是从 local_code 导入的,并且将其导入地图不会出错。函数 myfunc 还依赖于 local_code.

中定义的其他函数

使用此代码,我仍然收到此错误

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95+\x00\x00\x00\x00\x00\x00\x00\x8c\x11local_code\x94\x8c\x$
Traceback (most recent call last):
  File "/home/gallagher.r/.local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'local_code'

使用 upload_file() 似乎很简单,我不确定我做错了什么。我一定是把它放在了错误的地方,或者没有正确理解传递给它的内容。

如果您对此有任何帮助,我将不胜感激。如果您需要任何其他信息或者我可以从错误文件中提供任何其他信息,请告诉我。

upload_file方法只将文件上传到当前可用的worker。如果工作人员在您致电 upload_file 后到达,则该工作人员将没有提供的文件。

如果您遇到这种情况,最简单的做法可能是等到所有工作人员都到达后再调用上传文件

cluster.scale(n)
with Client(cluster) as client:
    client.wait_for_workers(n)
    client.upload_file(...)

当你有工作人员 in/out 时,另一个选择是使用 Client.register_worker_callbacks 在新工作人员 registered/added 时挂钩。一个警告是您需要在回调部分中序列化您的文件:

fname = ...
with open(fname, 'rb') as f:
  data = f.read()

client.register_worker_callbacks(
  setup=functools.partial(
    _worker_upload, data=data, fname=fname,
  )
)

def _worker_upload(dask_worker, *, data, fname):
  dask_worker.loop.add_callback(
    callback=dask_worker.upload_file,
    comm=None,  # not used
    filename=fname,
    data=data,
    load=True)

这也会在第一次注册回调时上传文件,这样您就可以完全避免调用 client.upload_file