Python 中具有动态生成的 MPI 深度优先搜索

MPI Depth-first search with dynamic spawns in Python

好的,所以我想在树状结构中进行多线程深度优先搜索。我为此使用了来自集群中多台计算机的线程(本例中为 localhost 四核和 raspberry pi 2)。主线程应该启动这个过程,并且在树中的第一次分裂时,对于它分裂成的每个节点,它应该产生一个新线程。然后这些线程应该能够将他们的发现报告给主控。

我正在尝试动态执行此操作,而不是为 mpiexec 提供多个线程,因为我事先不知道树的外观(例如,可能有 2 或 9 个拆分)。

我从我正在处理的项目中为这个问题做了一个样本,我让它按如下方式工作。它从一串数字中取出一个数字,并为每个数字生成一个线程并将该数字发送到该线程。

为高手:

#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy, time

################ Set up MPI variables ################

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
status = MPI.Status()

################ Master code ################

script = 'cpi.py'
for d in '34':
   try:
       print 'Trying to spawn child process...'
       icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
       spawnrank = icomm.Get_rank()
       icomm.send(d, dest=spawnrank, tag=11)
       print 'Spawned rank %d.' % spawnrank    
   except: ValueError('Spawn failed to start.')

solved = False
while solved == False:
    #while not comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG):
    #    print 'spawns doing some work...'
    #    time.sleep(1)
solved = comm.recv(source=MPI.ANY_SOURCE, tag=22)
print 'received solution: %d' % solved

它正确地产生了工人,他们收到了数字但不把它发回给主人。工人的代码如下:

工人

#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy

################ Set up MPI variables ################

icomm = MPI.Comm.Get_parent()
comm = MPI.COMM_WORLD
irank = comm.Get_rank()
rank = comm.Get_rank()

running = True
while running:
    data = None
    data = icomm.recv(source=0, tag=11)
    if data:
        print 'Trying to send %s from worker rank %d to %d' % (data, rank, irank)
        icomm.send(data, dest=0, tag=22)
        break
print 'Worker on rank %d done.' % rank
icomm.Disconnect()

它永远不会到达主代码的最后一行。我还在主代码处添加(注释掉)一个探测器,以检查标记为 22 的消息是否在某处徘徊,排除 recv 函数中的错误,但探测器从未找到该消息。所以我假设它永远不会发送。

我通过打印两个进程的等级得出 它们都使用等级 0 这是有道理的,因为它们是在同一台计算机上产生的。但是当我添加一个主机文件和排名文件时,试图强制它为奴隶使用不同的计算机,它给了我以下错误:

[hch-K55A:06917] *** Process received signal ***
[hch-K55A:06917] Signal: Segmentation fault (11)
[hch-K55A:06917] Signal code: Address not mapped (1)
[hch-K55A:06917] Failing at address: 0x3c
[hch-K55A:06917] [ 0] /lib/x86_64-linux-gnu/libpthread.so.0(+0x10340) [0x7f2c0d864340]
[hch-K55A:06917] [ 1] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(orte_rmaps_rank_file_lex+0x4a0) [0x7f2c0abdcb70]
[hch-K55A:06917] [ 2] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(+0x23ac) [0x7f2c0abda3ac]
[hch-K55A:06917] [ 3] /usr/lib/libopen-rte.so.4(orte_rmaps_base_map_job+0x2e) [0x7f2c0dacd05e]
[hch-K55A:06917] [ 4] /usr/lib/libopen-rte.so.4(orte_plm_base_setup_job+0x5a) [0x7f2c0dac580a]
[hch-K55A:06917] [ 5] /usr/lib/openmpi/lib/openmpi/mca_plm_rsh.so(orte_plm_rsh_launch+0x338) [0x7f2c0b80a8c8]
[hch-K55A:06917] [ 6] /usr/lib/libopen-rte.so.4(+0x51ff4) [0x7f2c0dac3ff4]
[hch-K55A:06917] [ 7] /usr/lib/libopen-rte.so.4(opal_event_base_loop+0x31e) [0x7f2c0dae9cfe]
[hch-K55A:06917] [ 8] mpiexec() [0x4047d3]
[hch-K55A:06917] [ 9] mpiexec() [0x40347d]
[hch-K55A:06917] [10] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf5) [0x7f2c0d4b0ec5]
[hch-K55A:06917] [11] mpiexec() [0x403399]
[hch-K55A:06917] *** End of error message ***
Segmentation fault (core dumped)

使用的命令:mpiexec -np 1 --hostfile hostfile --rankfile rankfile python spawntest.py

主机文件: 本地主机 本地主机插槽=1 最大插槽=4 pi2@raspi2 插槽=4

排名文件: 等级 0=本地主机插槽=1 等级 1=pi2@raspi2 插槽=1-4

所以我的问题如下;如何在主计算机以外的计算机上生成这些线程,同时能够来回发送数据?

你的主人的代码非常错误,我觉得你对那里发生的事情缺乏一些概念性的理解。

MPI_COMM_SPAWN(或其对应的 mpi4py comm.Spawn())生成的作业中的 MPI 进程不会成为 parent 的 MPI_COMM_WORLD 的一部分。生成的进程形成一个完全独立的世界通信器,并通过 内部通信器 与 parent 作业交互link,这正是生成 return秒。在您的情况下, icomm = MPI.COMM_SELF.Spawn(...) 是主进程中的内部通信器句柄。 child 作业中的进程使用 MPI_COMM_GET_PARENT(mpi4py 中的 MPI.Comm.Get_parent())获取内部通信器句柄。由于您正在产生 single-process 个职位:

MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
                                                   ^^^^^^^^^^

child 工作的新形成的世界通信器中只有一个进程,因此 MPI.COMM_WORLD.Get_rank() return 每个工人中的零。

你的主人代码的这一部分是错误的,但由于内部通信器的实际工作方式,它仍然有效:

spawnrank = icomm.Get_rank() # <--- not what you expect
icomm.send(d, dest=spawnrank, tag=11)

Intercommunicators link 两组独立的进程。其中一个称为本地组,另一个称为远程组。在对讲机上使用 MPI_COMM_RANK (comm.Get_rank()) 时,您将获得调用进程在 本地组 中的排名。但是,在发送或接收时,指定的等级与 远程组 相关。在您的情况下,产生一个新的工作人员会导致以下内部通信器:

    mastet's MPI_COMM_SELF           child's MPI_COMM_WORLD
              |                                |
+=============|================================|=============+
|  +----------V----------+       +-------------V----------+  |
|  | group of the master |       | group of the child job |  |
|  |        [ 0 ]        |       |          [ 0 ]         |  |
|  +---------------------+       +------------------------+  |
|                    intercommunicator                       |
+============================================================+

(上面的通讯器显示了每个组的来源;通讯器本身不是互通器的一部分)

哪个组是本地的,哪个是远程的,取决于调用进程属于哪个组。主进程的本地组是 child 作业中等级的远程组,反之亦然。这里重要的是,每个组的等级为 0,因为一组中至少有一个进程。您很幸运,master 组中只有一个进程,因此 icomm.Get_rank() returns 0(并且它将始终 return 零,因为 master 的本地组派生自 MPI_COMM_SELF,它总是包含一个进程),它恰好(总是)是远程(child)组中的有效排名。正确的做法是将消息发送到您知道远程组中存在的固定等级,例如等级 0:

   icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
   icomm.send(d, dest=0, tag=11)

(此代码明确发送到远程组的排名 0,而之前的值 0 只是一个幸运的巧合)

也就是说,发送部分 - 虽然不正确 - 仍然有效。接收部分没有,有几个原因。首先,您使用了错误的通信器 - 从 MPI_COMM_WORLD 接收不起作用,因为 child 进程不是它的成员。事实上,MPI 中的通信器是不可变的——您不能在不创建新通信器的情况下添加或删除等级。您应该使用 icomm 从工作人员那里接收,就像您使用它向他们发送一样。现在,出现了第二个问题 - master 中的 icomm 被每个新的 Spawn 覆盖,因此你实际上失去了与除最后一个以外的任何 child 作业进行通信的能力。您需要保留句柄列表并将句柄附加到它。

接收部分有点复杂。没有 MPI_ANY_COMM - 您无法进行涵盖所有 child 作业的接收操作,因为它们都位于各自的内部通信器中。您应该使用 MPI_IPROBE 在内部通信器列表上循环,或者(更好)从每个 child 开始接收 non-blocking 然后使用 MPI_WAIT_SOME (无论 mpi4py 等价物是什么)。

有了循环,主代码应该看起来像这样(注意 - 未经测试的代码,我没有 and/or 使用 mpi4py):

#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy, time

################ Set up MPI variables ################

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
status = MPI.Status()

################ Master code ################

icomms = []
script = 'cpi.py'
for d in '34':
   try:
       print 'Trying to spawn child process...'
       icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
       icomm.send(d, dest=0, tag=11)
       icomms.append(icomm)
       print 'Spawned a child.'
   except: ValueError('Spawn failed to start.')

solved = False
while not solved and icomms:
    for icomm in icomms:
        if icomm.Iprobe(source=0, tag=MPI.ANY_TAG):
            print 'A child responded...'
            solved = icomm.recv(source=0, tag=MPI.ANY_TAG)
            icomm.Disconnect()
            icomms.remove(icomm)
            if solved: break
    if not solved:
        print 'spawns doing some work...'
        time.sleep(1)
# make sure all pending sends get matched
for icomm in icomms:
    icomm.recv(source=0, tag=MPI.ANY_TAG)
    icomm.Disconnect()
print 'received solution: %d' % solved

希望你明白了。

补充:如果您从一个生成的作业中生成一个作业,新的 child 无法轻易地与 top-level 主服务器建立连接。为此,您应该求助于 MPI-2 client/server 模型支持的一个模糊部分,并让主机使用 MPI_PORT_OPEN 打开一个端口,然后使用 MPI_PUBLISH_NAME 向 MPI 命名服务注册它,最后使用 MPI_COMM_ACCEPT 接收来自任何其他 MPI 作业的连接。 workers 应该使用 MPI_LOOKUP_NAME 获取对端口的引用,并使用 MPI_COMM_CONNECT 与 master job 建立互通器。我不知道 mpi4py 中是否存在这些函数的包装器,如果存在,它们是如何命名的。