Python 并行案例的多重处理
Python Multiprocessing of parallel cases
我有一组模拟,每个模拟都使用 MPI 来并行 运行(它们是 CFD 模拟)。但是,我想在 Python 和 运行 中并行创建一个任务池。我使用了如下多处理库:
import itertools
import multiprocessing
def Run_Cases(params):
run_each_mpi_simulation
a = range(1,len(U)); b = range(len(R))
paramlist = list(itertools.product(a,b))
pool = multiprocessing.Pool()
pool.map(Run_Case,paramlist)
所以基本上,代码创建任务池(模拟实例)并将它们分配给每个处理器 运行。但是,它没有考虑到每个模拟都需要 2 个处理器,因为每个案例都是并行 (MPI) 模拟。这导致模拟中的性能显着下降。
特此,我想知道是否有可能以某种方式定义分配给 Python 多处理包中每个任务的处理器数量?
非常感谢任何评论。
亲切的问候
阿什坎
EDIT/UPDATE:
非常感谢@AntiMatterDynamite 的回答。
我试过你的方法,处理器的性能和工作负载分配有了很大改善,但似乎有 2 个问题:
1) 虽然一切仍在继续,但我收到以下消息
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 1073, in run
self.function(*self.args, **self.kwargs)
File "/usr/local/lib/python2.7/dist-packages/PyFoam/Execution/FoamThread.py", line 86, in getLinuxMem
me=psutil.Process(thrd.threadPid)
File "/usr/local/lib/python2.7/dist-packages/psutil/__init__.py", line 341, in __init__
self._init(pid)
File "/usr/local/lib/python2.7/dist-packages/psutil/__init__.py", line 381, in _init
raise NoSuchProcess(pid, None, msg)
NoSuchProcess: psutil.NoSuchProcess no process found with pid 8880
非常感谢您的评论。
再次感谢
阿什坎
EDIT/UPDATE:
我认为该消息是因为进程数少于处理器列表。因为我有两个 cases/simulations,每个都使用 2 个处理器,所以当我使用超线程时,我有 8 个处理器,所以得到了消息。它是使用 4 个处理器或具有更大的模拟池解决的。
multiprocessing.Pool
接受要创建的进程数作为其第一个参数。
您可以使用 multiprocessing.cpu_count()
获取逻辑 cpu 内核的数量,然后在池中创建一半数量的进程(因此每个进程有 2 个)
multiprocessing.Pool(multiprocessing.cpu_count()/2)
这假设您的 cpu 计数相差 2,这对于几乎所有 cpu 都是正确的...
请注意,此解决方案不考虑 SMT(或超线程),因为 multiprocessing.cpu_count()
计算 逻辑 内核,因此它可能会报告双倍的物理内核。对于大多数 cpu 密集型任务,SMT 是一种性能提升,您 运行 将任务加倍但速度减半,如果您有 SMT,则需要确定它是否适合您的模拟
最后,您还可以设置每个进程的关联性,使其只能 运行 在 2 个核心上
但是没有直接的标准方法来执行此操作,因为多处理不会公开它打开的进程的 PID。这里有一些粗略的完整代码,可以为每个进程设置亲和力:
import multiprocessing
import psutil
import itertools
cores_per_process = 2
cpu_count = multiprocessing.cpu_count()
manager = multiprocessing.Manager()
pid_list = manager.list() # trick to find pid of all the processes in the pool
cores_list = range(cpu_count)
it = [iter(cores_list)] * cores_per_process # this is a python trick to group items from the same list together into tuples
affinity_pool = manager.list(zip(*it)) # list of affinity
def Run_Case(params):
self_proc = psutil.Process() # get your own process
if self_proc.pid not in pid_list:
pid_list.append(self_proc.pid) # found new pid in pool
self_proc.cpu_affinity(affinity_pool.pop()) # set affinity from the affinity list but also remove it so other processes can't use the same affinity
#run simulation
a = range(1, len(U))
b = range(len(R))
paramlist = list(itertools.product(a, b))
pool = multiprocessing.Pool(cpu_count/cores_per_process) # decide on how many processes you want
pool.map(Run_Case, paramlist)
我有一组模拟,每个模拟都使用 MPI 来并行 运行(它们是 CFD 模拟)。但是,我想在 Python 和 运行 中并行创建一个任务池。我使用了如下多处理库:
import itertools
import multiprocessing
def Run_Cases(params):
run_each_mpi_simulation
a = range(1,len(U)); b = range(len(R))
paramlist = list(itertools.product(a,b))
pool = multiprocessing.Pool()
pool.map(Run_Case,paramlist)
所以基本上,代码创建任务池(模拟实例)并将它们分配给每个处理器 运行。但是,它没有考虑到每个模拟都需要 2 个处理器,因为每个案例都是并行 (MPI) 模拟。这导致模拟中的性能显着下降。
特此,我想知道是否有可能以某种方式定义分配给 Python 多处理包中每个任务的处理器数量?
非常感谢任何评论。
亲切的问候 阿什坎
EDIT/UPDATE:
非常感谢@AntiMatterDynamite 的回答。
我试过你的方法,处理器的性能和工作负载分配有了很大改善,但似乎有 2 个问题:
1) 虽然一切仍在继续,但我收到以下消息
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 1073, in run
self.function(*self.args, **self.kwargs)
File "/usr/local/lib/python2.7/dist-packages/PyFoam/Execution/FoamThread.py", line 86, in getLinuxMem
me=psutil.Process(thrd.threadPid)
File "/usr/local/lib/python2.7/dist-packages/psutil/__init__.py", line 341, in __init__
self._init(pid)
File "/usr/local/lib/python2.7/dist-packages/psutil/__init__.py", line 381, in _init
raise NoSuchProcess(pid, None, msg)
NoSuchProcess: psutil.NoSuchProcess no process found with pid 8880
非常感谢您的评论。
再次感谢 阿什坎
EDIT/UPDATE:
我认为该消息是因为进程数少于处理器列表。因为我有两个 cases/simulations,每个都使用 2 个处理器,所以当我使用超线程时,我有 8 个处理器,所以得到了消息。它是使用 4 个处理器或具有更大的模拟池解决的。
multiprocessing.Pool
接受要创建的进程数作为其第一个参数。
您可以使用 multiprocessing.cpu_count()
获取逻辑 cpu 内核的数量,然后在池中创建一半数量的进程(因此每个进程有 2 个)
multiprocessing.Pool(multiprocessing.cpu_count()/2)
这假设您的 cpu 计数相差 2,这对于几乎所有 cpu 都是正确的...
请注意,此解决方案不考虑 SMT(或超线程),因为 multiprocessing.cpu_count()
计算 逻辑 内核,因此它可能会报告双倍的物理内核。对于大多数 cpu 密集型任务,SMT 是一种性能提升,您 运行 将任务加倍但速度减半,如果您有 SMT,则需要确定它是否适合您的模拟
最后,您还可以设置每个进程的关联性,使其只能 运行 在 2 个核心上 但是没有直接的标准方法来执行此操作,因为多处理不会公开它打开的进程的 PID。这里有一些粗略的完整代码,可以为每个进程设置亲和力:
import multiprocessing
import psutil
import itertools
cores_per_process = 2
cpu_count = multiprocessing.cpu_count()
manager = multiprocessing.Manager()
pid_list = manager.list() # trick to find pid of all the processes in the pool
cores_list = range(cpu_count)
it = [iter(cores_list)] * cores_per_process # this is a python trick to group items from the same list together into tuples
affinity_pool = manager.list(zip(*it)) # list of affinity
def Run_Case(params):
self_proc = psutil.Process() # get your own process
if self_proc.pid not in pid_list:
pid_list.append(self_proc.pid) # found new pid in pool
self_proc.cpu_affinity(affinity_pool.pop()) # set affinity from the affinity list but also remove it so other processes can't use the same affinity
#run simulation
a = range(1, len(U))
b = range(len(R))
paramlist = list(itertools.product(a, b))
pool = multiprocessing.Pool(cpu_count/cores_per_process) # decide on how many processes you want
pool.map(Run_Case, paramlist)