Python 并行案例的多重处理

Python Multiprocessing of parallel cases

我有一组模拟,每个模拟都使用 MPI 来并行 运行(它们是 CFD 模拟)。但是,我想在 Python 和 运行 中并行创建一个任务池。我使用了如下多处理库:

import itertools
import multiprocessing

def Run_Cases(params):
    run_each_mpi_simulation

a = range(1,len(U)); b = range(len(R))
paramlist = list(itertools.product(a,b))
pool = multiprocessing.Pool()
pool.map(Run_Case,paramlist)

所以基本上,代码创建任务池(模拟实例)并将它们分配给每个处理器 运行。但是,它没有考虑到每个模拟都需要 2 个处理器,因为每个案例都是并行 (MPI) 模拟。这导致模拟中的性能显着下降。

特此,我想知道是否有可能以某种方式定义分配给 Python 多处理包中每个任务的处理器数量?

非常感谢任何评论。

亲切的问候 阿什坎

EDIT/UPDATE:

非常感谢@AntiMatterDynamite 的回答。

我试过你的方法,处理器的性能和工作负载分配有了很大改善,但似乎有 2 个问题:

1) 虽然一切仍在继续,但我收到以下消息

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 1073, in run
    self.function(*self.args, **self.kwargs)
  File "/usr/local/lib/python2.7/dist-packages/PyFoam/Execution/FoamThread.py", line 86, in getLinuxMem
    me=psutil.Process(thrd.threadPid)
  File "/usr/local/lib/python2.7/dist-packages/psutil/__init__.py", line 341, in __init__
    self._init(pid)
  File "/usr/local/lib/python2.7/dist-packages/psutil/__init__.py", line 381, in _init
    raise NoSuchProcess(pid, None, msg)
NoSuchProcess: psutil.NoSuchProcess no process found with pid 8880

非常感谢您的评论。

再次感谢 阿什坎

EDIT/UPDATE:

我认为该消息是因为进程数少于处理器列表。因为我有两个 cases/simulations,每个都使用 2 个处理器,所以当我使用超线程时,我有 8 个处理器,所以得到了消息。它是使用 4 个处理器或具有更大的模拟池解决的。

multiprocessing.Pool 接受要创建的进程数作为其第一个参数。 您可以使用 multiprocessing.cpu_count() 获取逻辑 cpu 内核的数量,然后在池中创建一半数量的进程(因此每个进程有 2 个)

 multiprocessing.Pool(multiprocessing.cpu_count()/2)

这假设您的 cpu 计数相差 2,这对于几乎所有 cpu 都是正确的...

请注意,此解决方案不考虑 SMT(或超线程),因为 multiprocessing.cpu_count() 计算 逻辑 内核,因此它可能会报告双倍的物理内核。对于大多数 cpu 密集型任务,SMT 是一种性能提升,您 运行 将任务加倍但速度减半,如果您有 SMT,则需要确定它是否适合您的模拟

最后,您还可以设置每个进程的关联性,使其只能 运行 在 2 个核心上 但是没有直接的标准方法来执行此操作,因为多处理不会公开它打开的进程的 PID。这里有一些粗略的完整代码,可以为每个进程设置亲和力:

import multiprocessing
import psutil
import itertools

cores_per_process = 2
cpu_count = multiprocessing.cpu_count()

manager = multiprocessing.Manager()
pid_list = manager.list()  # trick to find pid of all the processes in the pool

cores_list = range(cpu_count)
it = [iter(cores_list)] * cores_per_process  # this is a python trick to group items from the same list together into tuples
affinity_pool = manager.list(zip(*it))  # list of affinity

def Run_Case(params):
    self_proc = psutil.Process()  # get your own process
    if self_proc.pid not in pid_list:
        pid_list.append(self_proc.pid)  # found new pid in pool
        self_proc.cpu_affinity(affinity_pool.pop())  # set affinity from the affinity list but also remove it so other processes can't use the same affinity
    #run simulation

a = range(1, len(U))
b = range(len(R))
paramlist = list(itertools.product(a, b))
pool = multiprocessing.Pool(cpu_count/cores_per_process)  # decide on how many processes you want
pool.map(Run_Case, paramlist)