Dask "no module named xxxx" 错误

Dask "no module named xxxx" error

我尝试使用 dask distributed 提交位于另一个名为 worker.py 的文件中的函数。 在工人中我有以下错误:

No module named 'worker'

但是我无法弄清楚我在这里做错了什么...

这是我的代码示例:

import worker

def run(self):
    dask_queue = queue.Queue()
    remote_queue = self.executor.scatter(dask_queue)
    map_queue = self.executor.map(worker.run, remote_queue)
    result = self.executor.gather(map_queue)

    # Load data into the queue
    for option in self.input.get_next_option():
        remote_queue.put([self.server, self.arg, option])

这是在工作端获得的完整回溯:

distributed.core - INFO - Failed to deserialize b'\x80\x04\x95\x19\x00\x00\x00\x00\x00\x00\x00\x8c\x06worker\x94\x8c\nrun\x94\x93\x94.' Traceback (most recent call last): File "/usr/local/lib/python3.5/dist-packages/distributed/core.py", line 74, in loads return pickle.loads(x) ImportError: No module named 'worker' distributed.worker - WARNING - Could not deserialize task Traceback (most recent call last): File "/usr/local/lib/python3.5/dist-packages/distributed/worker.py", line 496, in compute_one task) File "/usr/local/lib/python3.5/dist-packages/distributed/worker.py", line 284, in deserialize function = loads(function) File "/usr/local/lib/python3.5/dist-packages/distributed/core.py", line 74, in loads return pickle.loads(x) ImportError: No module named 'worker'

编辑:请参阅 MRocklin 评论以获得更清洁的解决方案

实际上,如果在 dask worker 中执行的代码在外部模块中,则必须从 dask worker 路径中知道它(它不是从客户端序列化到 worker)。

更改我的 PYTHONPATH 以确保工作人员知道该模块解决了问题。 在 dask 问题中发布了类似的问题:

https://github.com/dask/distributed/issues/344

我也遇到了类似的问题。创建 dask graph 时使用了 python 模块中的函数。但是,工作进程找不到 python 模块。

工作人员控制台中出现以下错误。这里,tasks.py 包含了 dask graph 中使用的 worker 函数。

[ worker 10.0.2.4 ] : ModuleNotFoundError: No module named 'tasks'
[ worker 10.0.2.4 ] : distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95\x14\x00\x00\x00\x00\x00\x00\x00\x8c\x05tasks\x94\x8c\x06ogs_mk\x94\x93\x94.'

当使用 Client.upload_file(如下所示)将本地 python 模块发送给工作人员时,问题得到解决。

client.upload_file('tasks.py')     ## Send local package to workers
results = client.get(dsk, 'root')  ## get the results

这个问题可能有两种情况:调用dask-distributed函数的主代码中的import没有找到,或者dask-distributed函数内部的import没有找到。无论哪种方式,解决方案是更新 sys.path 以指向这些模块所在的位置。

就我而言,我更新了两者。

例如,假设在您的主脚本中有模块 xxx,而在您要分发的 dask 函数中有模块 yyy。罐头应该是这样的:

from dask.distributed import Client
import sys

def update_syspath():
  sys.path.insert(0, 'module_xxx_location')

# you have to update sys.path first before import the xxx module
import xxx

def dask_function():
  sys.path.insert(0, 'module_yyy_location')
  import yyy

client.submit(dask_function, params)