如何用Dask做复合并行(类似于MPI+OpenMP)

How to do composite parallelism (similar to MPI+OpenMP) with Dask

我刚刚开始学习 Dask 并了解它在令人尴尬的并行任务中的应用。我有一个函数可以从单个文件读取数据并对该数据执行 long-运行 计算。我通过与 joblib 并行化来加快计算速度。

我现在想使用 Dask 将其扩展到多台分布式机器。我想请求一些节点并让每个节点 machine/node 处理文件池中的一个文件并 return 结果。我希望每个文件的处理都利用某种本地节点上并行性。

如果这是 MPI+OpenMP,我会在每台机器上有一个 运行k,每个 运行k 的物理核心数作为 OpenMP 线程。使用 Dask,我只看到如何创建一个巨大的工作池,这些工作池将共享每个文件或所有文件的处理。我想要复合并行性(每个节点一个文件,每个节点的 ncore 进程帮助处理每个文件)

我尝试用 dask 调用一个 joblib 函数,但它没有利用每台机器上的所有内核。我也看不到如何通过 client.submit 将提交的任务固定到某台机器。

import joblib
import itertools
import numpy as np


#world's stupidest function as a simple illustrative example, matrix is read in from a file and this function is called on many different pairs in conjunction with the matrix to create a very expensive computation. parallelization over the list of pairs is trivial 
def example(matrix, pair):
    for i in range(100000): #takes almost no time
    #for i in range(10000000): #takes a long time
        x=np.exp(100)
    return pair[0]+pair[1]+pair[2]

def my_parallel_example(matrix, pairs, num_jobs):
    results= joblib.Parallel(n_jobs=num_jobs, verbose=10)(joblib.delayed(example)(matrix, pair) for pair in pairs) 
    return results


from dask_jobqueue import SGECluster

cores_per_node=24

cluster = SGECluster(
    cores=1,
    dashboard_address=':0',
    job_extra=['-pe {} {}'.format(parallel_environment, cores_per_node), '-j y', '-o /dev/null'],
    local_directory='$TMPDIR',
    memory=100 GiB,
    processes=cores_per_node,
    project=project_name,
    walltime='00:30:00'
)

#just requesting one 24-core machine
requested_cores=24
cluster.scale(requested_nodes)
client.wait_for_workers(requested_nodes)

matrix=None
possibilities=[1, 2, 3]
pairs = list(itertools.product(possibilities, possibilities, possibilities))
num_jobs=10
c=client.submit(my_parallel_bfs, matrix, pairs, num_jobs)

最终我想要这样的东西,但我无法弄清楚语法:

matrices=[mat1, mat2, ...] #each matrix read from a seperate file and added to a pool of 'big' jobs to be tackled by a node
results=[]
for matrix in matrices: 
    c=client.submit(my_parallel_bfs, matrix, pairs, num_jobs) #each job is submitted to a node that then uses several cores/processes to compute the result corresponding to each file. 
    results.append(c.result() )

我在网上看到的所有内容似乎都使用分布式 dask 进行令人尴尬的并行作业,而且我没有看到每个节点嵌套一个大计算并在每个大计算上使用 ncore 子进程的应用示例。

我希望上面的代码能够像我直接 运行 时一样有效地利用 10 个内核而没有 dask,但是通过 client.submit() 提交它似乎只使用一个内核.而且我不知道如何以这样一种方式扩展到多台机器,即每台机器无需在矩阵池中的一个矩阵上进行通信即可工作。

所以在纠结了很长时间,并在网上发现了很多关于混合 joblib/Dask(例如 https://github.com/joblib/joblib/issues/875)的问题后,我想出了一个足够好的解决方法并且可以 post 部分答案。

理想情况下,我想使用 dask 提交独立的 "parent" 任务到单独的客户端(机器),并让每个任务调用一个与 joblib 并行的计算密集型函数(每个核心使用一个 joblib worker)。最终,即使在考虑了各种选项(joblib 中的线程与进程、每台机器的进程数等)之后,我也从未看到过良好的性能。让 dask 调用 joblib 杀死了我的并行性,也许引擎盖下有一些锁定机制。

相反,我重写了程序以完全使用 Dask(每个内核一个 Dask worker,而不是每台机器一个 worker)并去掉了 joblib。

cores_per_node=24

cluster = SGECluster(
    cores=cores_per_node,
    dashboard_address=':0',
    job_extra=['-pe {} {}'.format(parallel_environment, cores_per_node), '-j y', '-o /dev/null'],
    local_directory='$TMPDIR',
    memory=100 GiB,
    processes=cores_per_node,
    project=project_name,
    walltime='00:30:00'
)

requested_nodes=2
total_workers=cores_per_node*requested_nodes


cluster.scale(total_workers)
client.wait_for_workers(total_workers)

此时我请求了 2 台 24 核机器,而 Dask 有 48 名工人,所以每个核心一个工人。

#I was not able to distribute one "file" of work to a machine, but I was able to have multiple machines process one file. 
info=dict #initialize empty dictionary
for file in list_of_files:
    matrix=np.load(file)
    remote_matrix=client.scatter(matrix) #necessary to prevent any dask warnings and seemed to improve performance by reducing communication. each worker 
    futures = []
    for pair in pairs:
        #now each pair is a dask job, previously this was sent as jobs to joblib
        futures.append(client.submit(example, remote_matrix, pair, workers=first_host))
    tmp_result=client.gather(futures) #blocks until all jobs completed
    info[file]=tmp_result #store the results for one file

鉴于此特定问题的性质,此解决方法只是一个合理的选择。它是令人尴尬的并行,真正 不需要两级并行,而是可以让多台机器一次处理一个文件。我仍然不确定我将如何处理我真正需要每台机器一个 "parent" 工作人员启动子进程到各自核心的任务(除了切换到 C/Fortran/C++ 并使用 MPI/OMP)。

即使纯粹使用 Dask,我也看不到一种创建 parent/child 关系的方法,而不需要一些 hack(例如,创建客户列表而不是使用 scale_up)让我控制tasks/subtasks 在多台机器上的物理位置。