集群上的 SLURM 和 Python 多处理池

SLURM and Python multiprocessing pool on a cluster

我正在尝试 运行 SLURM 集群 (4x raspberry Pi 3) 上的一个简单并行程序,但没有成功。我一直在阅读它,但我就是无法让它工作。问题如下:

我有一个名为 remove_duplicates_in_scraped_data.py 的 Python 程序。该程序在单个节点 (node=1xraspberry pi) 上执行,程序内部有一个多处理循环部分,如下所示:

pool = multiprocessing.Pool()
input_iter= product(FeaturesArray_1, FeaturesArray_2, repeat=1)
results = pool.starmap(refact_featureMatch, input_iter)

这个想法是,当它到达程序的那一部分时,它应该分配计算,迭代器中每个元素一个线程,最后合并结果。 因此,程序 remove_duplicates_in_scraped_data.py 运行s 一次(不是多次)并且它在池计算期间产生不同的线程。

在单台机器上(不使用 SLURM)它工作得很好,对于 raspberry pi 的特殊情况,它产生 4 个线程,进行计算,将其保存在结果中并继续程序作为一个线程。

我想利用 SLURM 集群的所有 16 个线程,但我似乎无法让它工作。而且我确信集群已正确配置,因为它可以 运行 在集群的所有 16 个线程中使用 SLURM 的所有多处理示例(例如计算 pi 的数字)。

现在,查看 sinfo -N -l 的 SLURM 配置,我们有:

NODELIST   NODES  PARTITION       STATE CPUS    S:C:T MEMORY TMP_DISK WEIGHT AVAIL_FE REASON
node01         1 picluster*        idle    4    4:1:1      1        0      1   (null) none
node02         1 picluster*        idle    4    4:1:1      1        0      1   (null) none
node03         1 picluster*        idle    4    4:1:1      1        0      1   (null) none
node04         1 picluster*        idle    4    4:1:1      1        0      1   (null) none

每个集群报告 4 个插槽,1 个核心和 1 个线程,就 SLURM 而言,有 4 个 CPU。

我希望利用所有 16 个 CPU,如果我 运行 我的程序为:

srun -N 4 -n 16  python3 remove_duplicates_in_scraped_data.py

它只会在每个节点中 运行 主程序的 4 个副本,从而产生 16 个线程。但这不是我想要的。我想要程序的单个实例,然后在集群中生成 16 个线程。至少我们知道 s运行 -N -n 16 集群可以工作。

因此,我尝试按如下方式更改程序:


#!/usr/bin/python3

#SBATCH -p picluster
#SBATCH --nodes=4
#SBATCH --ntasks=16
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=4
#SBATCH --ntasks-per-socket=1
#SBATCH --sockets-per-node=4

sys.path.append(os.getcwd())

...
...
...
pool = multiprocessing.Pool()
input_iter= product(FeaturesArray_1, FeaturesArray_2, repeat=1)
results = pool.starmap(refact_featureMatch, input_iter)
...
...

并用

执行
sbatch remove_duplicates_in_scraped_data.py

slurm 作业创建成功,我看到集群上所有节点都已分配

PARTITION  AVAIL  TIMELIMIT  NODES  STATE NODELIST
picluster*    up   infinite      4  alloc node[01-04]

该程序在 node01 上作为单个线程启动 运行,但是当它遇到并行部分时,它只在 node01 上生成 4 个线程,而在所有其他节点上什么也没有。

我尝试了不同的设置组合,甚至尝试通过脚本运行它

#!/bin/bash


#SBATCH -p picluster
#SBATCH --nodes=4
#SBATCH --ntasks=16
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=4
#SBATCH --ntasks-per-socket=1
#SBATCH --ntasks-per-core=1
#SBATCH --sockets-per-node=4

python3 remove_duplicates_in_scraped_data.py

但我无法让它在其他节点上生成。

你能帮帮我吗? 甚至可以这样做吗?即在集群的不同节点上使用 python 的多处理池? 如果没有,我还有哪些其他选择? 该集群还配置了 dask。这样效果会更好吗?

请帮忙,因为我真的被这个问题困住了。

谢谢

Python 多处理包仅限于共享内存并行化。它产生新进程,所有进程都可以访问单台机器的主内存。

您不能简单地将此类软件横向扩展到多个节点上。由于不同的机器没有可以访问的共享内存。

要运行 一次在多个节点上运行您的程序,您应该查看 MPI(消息传递接口)。还有一个python package

根据您的任务,它也可能适合 运行 程序 4 次(因此每个节点一个作业)并让它处理数据的子集。这通常是更简单的方法,但并非总是可行。

所以,我 运行 DASK 与 SLURM 集群和 Python 脚本可以很好地并行化。这需要最少的代码更改。所以上面的multiprocessing pool代码改为:

    cluster = SLURMCluster( header_skip=['--mem'],
                        queue='picluster',
                        cores=4,
                        memory='1GB'
                        )
    cluster.scale(cores=16) #the number of nodes to request
    dask_client = Client(cluster)

    lazy_results=[]
    for pair in input_iter:
      res = dask_client.submit(refact_featureMatch, pair[0], pair[1])
      lazy_results.append(res)

    results = dask_client.gather(lazy_results)

当然可能有更好的方法通过 DASK 来做到这一点。我愿意接受建议:)