Parallelize/distribute 每个工作人员在 dask 中调用子进程?

Parallelize/distribute subprocess call in dask each for one worker?

我有以下代码遍历每个项目目录,调用外部可执行文件并将结果写入 out* 文件。

from dask_jobqueue import PBSCluster   
cluster = PBSCluster()
cluster.scale(jobs=3)  

from dask.distributed import Client
client = Client(cluster)
...

r_path='/path/to/project/folder'


def func():
    f = open('out', 'w') # In project, customized out_file naming based on different dir's
    (subprocess.call(["/path/to/executable/file"], stdout=f))

for root, dirs, files in os.walk("."):
    for name in dirs:
        os.chdir(r_path+'/'+str(name))
        func()

此代码按顺序执行,但我想 运行 并行执行,即每个子进程调用一个 dask worker。

注意:对于所有不同的目录

,我有相同的subprocess.call(相同的可执行文件)

我试过了

def func():
   f = open('out', 'w') # In project, customized out_file naming based on different dir's
   func.file = (subprocess.call(["/path/to/executable/file"], stdout=f))


arg = [func.file]
workers = client.scheduler_info()['workers']
tasks = [client.submit(func, arg, workers=worker) for arg, worker in zip(args, workers)]

还有这个(可能没有使用 dask 到 distribute/parallelize)

def func():
    f = open('out', 'w')
    with io.open(f, mode='wb') as out:
        p = subprocess.Popen(["/path/to/executable/file"], stdout=out, stderr=out)
        child_processes.append(p)

for cp in child_processes:
    cp.wait()

但无法 parallelise/distribute 子进程调用。

有人可以帮我并行化这个子流程调用,每个调用 1 个工作人员,以便可以更快地执行事情。

提前致谢!

通常,第一次非 dask 尝试显示 easiest pattern to parallelise。但是,我会警告不要将全局状态与 os.chdir 一起使用 - 相反,通过完整路径引用输出文件,并将工作目录传递给子进程

r_path='/path/to/project/folder'

def func(path):
    f = open(os.path.join(path, 'out'), 'w')
    subprocess.call(["/path/to/executable/file"], stdout=f, cwd=path)

out = []
for root, dirs, files in os.walk("."):
    for name in dirs:
        path = r_path+'/'+str(name)
        out.append(dask.delayed(func)(path))

dask.compute(*out)