Parallelize/distribute 每个工作人员在 dask 中调用子进程?
Parallelize/distribute subprocess call in dask each for one worker?
我有以下代码遍历每个项目目录,调用外部可执行文件并将结果写入 out* 文件。
from dask_jobqueue import PBSCluster
cluster = PBSCluster()
cluster.scale(jobs=3)
from dask.distributed import Client
client = Client(cluster)
...
r_path='/path/to/project/folder'
def func():
f = open('out', 'w') # In project, customized out_file naming based on different dir's
(subprocess.call(["/path/to/executable/file"], stdout=f))
for root, dirs, files in os.walk("."):
for name in dirs:
os.chdir(r_path+'/'+str(name))
func()
此代码按顺序执行,但我想 运行 并行执行,即每个子进程调用一个 dask worker。
注意:对于所有不同的目录
,我有相同的subprocess.call(相同的可执行文件)
我试过了
def func():
f = open('out', 'w') # In project, customized out_file naming based on different dir's
func.file = (subprocess.call(["/path/to/executable/file"], stdout=f))
arg = [func.file]
workers = client.scheduler_info()['workers']
tasks = [client.submit(func, arg, workers=worker) for arg, worker in zip(args, workers)]
还有这个(可能没有使用 dask 到 distribute/parallelize)
def func():
f = open('out', 'w')
with io.open(f, mode='wb') as out:
p = subprocess.Popen(["/path/to/executable/file"], stdout=out, stderr=out)
child_processes.append(p)
for cp in child_processes:
cp.wait()
但无法 parallelise/distribute 子进程调用。
有人可以帮我并行化这个子流程调用,每个调用 1 个工作人员,以便可以更快地执行事情。
提前致谢!
通常,第一次非 dask 尝试显示 easiest pattern to parallelise。但是,我会警告不要将全局状态与 os.chdir
一起使用 - 相反,通过完整路径引用输出文件,并将工作目录传递给子进程
r_path='/path/to/project/folder'
def func(path):
f = open(os.path.join(path, 'out'), 'w')
subprocess.call(["/path/to/executable/file"], stdout=f, cwd=path)
out = []
for root, dirs, files in os.walk("."):
for name in dirs:
path = r_path+'/'+str(name)
out.append(dask.delayed(func)(path))
dask.compute(*out)
我有以下代码遍历每个项目目录,调用外部可执行文件并将结果写入 out* 文件。
from dask_jobqueue import PBSCluster
cluster = PBSCluster()
cluster.scale(jobs=3)
from dask.distributed import Client
client = Client(cluster)
...
r_path='/path/to/project/folder'
def func():
f = open('out', 'w') # In project, customized out_file naming based on different dir's
(subprocess.call(["/path/to/executable/file"], stdout=f))
for root, dirs, files in os.walk("."):
for name in dirs:
os.chdir(r_path+'/'+str(name))
func()
此代码按顺序执行,但我想 运行 并行执行,即每个子进程调用一个 dask worker。
注意:对于所有不同的目录
,我有相同的subprocess.call(相同的可执行文件)我试过了
def func():
f = open('out', 'w') # In project, customized out_file naming based on different dir's
func.file = (subprocess.call(["/path/to/executable/file"], stdout=f))
arg = [func.file]
workers = client.scheduler_info()['workers']
tasks = [client.submit(func, arg, workers=worker) for arg, worker in zip(args, workers)]
还有这个(可能没有使用 dask 到 distribute/parallelize)
def func():
f = open('out', 'w')
with io.open(f, mode='wb') as out:
p = subprocess.Popen(["/path/to/executable/file"], stdout=out, stderr=out)
child_processes.append(p)
for cp in child_processes:
cp.wait()
但无法 parallelise/distribute 子进程调用。
有人可以帮我并行化这个子流程调用,每个调用 1 个工作人员,以便可以更快地执行事情。
提前致谢!
通常,第一次非 dask 尝试显示 easiest pattern to parallelise。但是,我会警告不要将全局状态与 os.chdir
一起使用 - 相反,通过完整路径引用输出文件,并将工作目录传递给子进程
r_path='/path/to/project/folder'
def func(path):
f = open(os.path.join(path, 'out'), 'w')
subprocess.call(["/path/to/executable/file"], stdout=f, cwd=path)
out = []
for root, dirs, files in os.walk("."):
for name in dirs:
path = r_path+'/'+str(name)
out.append(dask.delayed(func)(path))
dask.compute(*out)