集群上的 SLURM 和 Python 多处理池
SLURM and Python multiprocessing pool on a cluster
我正在尝试 运行 SLURM 集群 (4x raspberry Pi 3) 上的一个简单并行程序,但没有成功。我一直在阅读它,但我就是无法让它工作。问题如下:
我有一个名为 remove_duplicates_in_scraped_data.py 的 Python 程序。该程序在单个节点 (node=1xraspberry pi) 上执行,程序内部有一个多处理循环部分,如下所示:
pool = multiprocessing.Pool()
input_iter= product(FeaturesArray_1, FeaturesArray_2, repeat=1)
results = pool.starmap(refact_featureMatch, input_iter)
这个想法是,当它到达程序的那一部分时,它应该分配计算,迭代器中每个元素一个线程,最后合并结果。
因此,程序 remove_duplicates_in_scraped_data.py 运行s 一次(不是多次)并且它在池计算期间产生不同的线程。
在单台机器上(不使用 SLURM)它工作得很好,对于 raspberry pi 的特殊情况,它产生 4 个线程,进行计算,将其保存在结果中并继续程序作为一个线程。
我想利用 SLURM 集群的所有 16 个线程,但我似乎无法让它工作。而且我确信集群已正确配置,因为它可以 运行 在集群的所有 16 个线程中使用 SLURM 的所有多处理示例(例如计算 pi 的数字)。
现在,查看 sinfo -N -l
的 SLURM 配置,我们有:
NODELIST NODES PARTITION STATE CPUS S:C:T MEMORY TMP_DISK WEIGHT AVAIL_FE REASON
node01 1 picluster* idle 4 4:1:1 1 0 1 (null) none
node02 1 picluster* idle 4 4:1:1 1 0 1 (null) none
node03 1 picluster* idle 4 4:1:1 1 0 1 (null) none
node04 1 picluster* idle 4 4:1:1 1 0 1 (null) none
每个集群报告 4 个插槽,1 个核心和 1 个线程,就 SLURM 而言,有 4 个 CPU。
我希望利用所有 16 个 CPU,如果我 运行 我的程序为:
srun -N 4 -n 16 python3 remove_duplicates_in_scraped_data.py
它只会在每个节点中 运行 主程序的 4 个副本,从而产生 16 个线程。但这不是我想要的。我想要程序的单个实例,然后在集群中生成 16 个线程。至少我们知道 s运行 -N -n 16 集群可以工作。
因此,我尝试按如下方式更改程序:
#!/usr/bin/python3
#SBATCH -p picluster
#SBATCH --nodes=4
#SBATCH --ntasks=16
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=4
#SBATCH --ntasks-per-socket=1
#SBATCH --sockets-per-node=4
sys.path.append(os.getcwd())
...
...
...
pool = multiprocessing.Pool()
input_iter= product(FeaturesArray_1, FeaturesArray_2, repeat=1)
results = pool.starmap(refact_featureMatch, input_iter)
...
...
并用
执行
sbatch remove_duplicates_in_scraped_data.py
slurm 作业创建成功,我看到集群上所有节点都已分配
PARTITION AVAIL TIMELIMIT NODES STATE NODELIST
picluster* up infinite 4 alloc node[01-04]
该程序在 node01 上作为单个线程启动 运行,但是当它遇到并行部分时,它只在 node01 上生成 4 个线程,而在所有其他节点上什么也没有。
我尝试了不同的设置组合,甚至尝试通过脚本运行它
#!/bin/bash
#SBATCH -p picluster
#SBATCH --nodes=4
#SBATCH --ntasks=16
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=4
#SBATCH --ntasks-per-socket=1
#SBATCH --ntasks-per-core=1
#SBATCH --sockets-per-node=4
python3 remove_duplicates_in_scraped_data.py
但我无法让它在其他节点上生成。
你能帮帮我吗?
甚至可以这样做吗?即在集群的不同节点上使用 python 的多处理池?
如果没有,我还有哪些其他选择?
该集群还配置了 dask。这样效果会更好吗?
请帮忙,因为我真的被这个问题困住了。
谢谢
Python 多处理包仅限于共享内存并行化。它产生新进程,所有进程都可以访问单台机器的主内存。
您不能简单地将此类软件横向扩展到多个节点上。由于不同的机器没有可以访问的共享内存。
要运行 一次在多个节点上运行您的程序,您应该查看 MPI(消息传递接口)。还有一个python package。
根据您的任务,它也可能适合 运行 程序 4 次(因此每个节点一个作业)并让它处理数据的子集。这通常是更简单的方法,但并非总是可行。
所以,我 运行 DASK 与 SLURM 集群和 Python 脚本可以很好地并行化。这需要最少的代码更改。所以上面的multiprocessing pool代码改为:
cluster = SLURMCluster( header_skip=['--mem'],
queue='picluster',
cores=4,
memory='1GB'
)
cluster.scale(cores=16) #the number of nodes to request
dask_client = Client(cluster)
lazy_results=[]
for pair in input_iter:
res = dask_client.submit(refact_featureMatch, pair[0], pair[1])
lazy_results.append(res)
results = dask_client.gather(lazy_results)
当然可能有更好的方法通过 DASK 来做到这一点。我愿意接受建议:)
我正在尝试 运行 SLURM 集群 (4x raspberry Pi 3) 上的一个简单并行程序,但没有成功。我一直在阅读它,但我就是无法让它工作。问题如下:
我有一个名为 remove_duplicates_in_scraped_data.py 的 Python 程序。该程序在单个节点 (node=1xraspberry pi) 上执行,程序内部有一个多处理循环部分,如下所示:
pool = multiprocessing.Pool()
input_iter= product(FeaturesArray_1, FeaturesArray_2, repeat=1)
results = pool.starmap(refact_featureMatch, input_iter)
这个想法是,当它到达程序的那一部分时,它应该分配计算,迭代器中每个元素一个线程,最后合并结果。 因此,程序 remove_duplicates_in_scraped_data.py 运行s 一次(不是多次)并且它在池计算期间产生不同的线程。
在单台机器上(不使用 SLURM)它工作得很好,对于 raspberry pi 的特殊情况,它产生 4 个线程,进行计算,将其保存在结果中并继续程序作为一个线程。
我想利用 SLURM 集群的所有 16 个线程,但我似乎无法让它工作。而且我确信集群已正确配置,因为它可以 运行 在集群的所有 16 个线程中使用 SLURM 的所有多处理示例(例如计算 pi 的数字)。
现在,查看 sinfo -N -l
的 SLURM 配置,我们有:
NODELIST NODES PARTITION STATE CPUS S:C:T MEMORY TMP_DISK WEIGHT AVAIL_FE REASON
node01 1 picluster* idle 4 4:1:1 1 0 1 (null) none
node02 1 picluster* idle 4 4:1:1 1 0 1 (null) none
node03 1 picluster* idle 4 4:1:1 1 0 1 (null) none
node04 1 picluster* idle 4 4:1:1 1 0 1 (null) none
每个集群报告 4 个插槽,1 个核心和 1 个线程,就 SLURM 而言,有 4 个 CPU。
我希望利用所有 16 个 CPU,如果我 运行 我的程序为:
srun -N 4 -n 16 python3 remove_duplicates_in_scraped_data.py
它只会在每个节点中 运行 主程序的 4 个副本,从而产生 16 个线程。但这不是我想要的。我想要程序的单个实例,然后在集群中生成 16 个线程。至少我们知道 s运行 -N -n 16 集群可以工作。
因此,我尝试按如下方式更改程序:
#!/usr/bin/python3
#SBATCH -p picluster
#SBATCH --nodes=4
#SBATCH --ntasks=16
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=4
#SBATCH --ntasks-per-socket=1
#SBATCH --sockets-per-node=4
sys.path.append(os.getcwd())
...
...
...
pool = multiprocessing.Pool()
input_iter= product(FeaturesArray_1, FeaturesArray_2, repeat=1)
results = pool.starmap(refact_featureMatch, input_iter)
...
...
并用
执行sbatch remove_duplicates_in_scraped_data.py
slurm 作业创建成功,我看到集群上所有节点都已分配
PARTITION AVAIL TIMELIMIT NODES STATE NODELIST
picluster* up infinite 4 alloc node[01-04]
该程序在 node01 上作为单个线程启动 运行,但是当它遇到并行部分时,它只在 node01 上生成 4 个线程,而在所有其他节点上什么也没有。
我尝试了不同的设置组合,甚至尝试通过脚本运行它
#!/bin/bash
#SBATCH -p picluster
#SBATCH --nodes=4
#SBATCH --ntasks=16
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=4
#SBATCH --ntasks-per-socket=1
#SBATCH --ntasks-per-core=1
#SBATCH --sockets-per-node=4
python3 remove_duplicates_in_scraped_data.py
但我无法让它在其他节点上生成。
你能帮帮我吗? 甚至可以这样做吗?即在集群的不同节点上使用 python 的多处理池? 如果没有,我还有哪些其他选择? 该集群还配置了 dask。这样效果会更好吗?
请帮忙,因为我真的被这个问题困住了。
谢谢
Python 多处理包仅限于共享内存并行化。它产生新进程,所有进程都可以访问单台机器的主内存。
您不能简单地将此类软件横向扩展到多个节点上。由于不同的机器没有可以访问的共享内存。
要运行 一次在多个节点上运行您的程序,您应该查看 MPI(消息传递接口)。还有一个python package。
根据您的任务,它也可能适合 运行 程序 4 次(因此每个节点一个作业)并让它处理数据的子集。这通常是更简单的方法,但并非总是可行。
所以,我 运行 DASK 与 SLURM 集群和 Python 脚本可以很好地并行化。这需要最少的代码更改。所以上面的multiprocessing pool代码改为:
cluster = SLURMCluster( header_skip=['--mem'],
queue='picluster',
cores=4,
memory='1GB'
)
cluster.scale(cores=16) #the number of nodes to request
dask_client = Client(cluster)
lazy_results=[]
for pair in input_iter:
res = dask_client.submit(refact_featureMatch, pair[0], pair[1])
lazy_results.append(res)
results = dask_client.gather(lazy_results)
当然可能有更好的方法通过 DASK 来做到这一点。我愿意接受建议:)