pool.map 对象很大时冻结

pool.map freezing when object is large

我正在使用 pool.map 来填充一个名为节点的字典。需要明确的是:这个字典是在 pool.map 运行 之后填充的,因此在进程之间共享变量不是一个问题。函数 returns 中的所有内容以及字典中的所有内容都是可腌制的。它正在填充本质上是图形的字典。当我深入 1、2、3 填充此图时,程序 运行 完美无缺。然而在 4 层:程序似乎没有崩溃,只是死机了。我在我要映射到的函数中设置了打印语句,并在其 运行 的最后将语句打印在程序的最顶部,然后冻结。这是我调用 pool.map:

的方式
    currentNode = startingNode
    nodesPopulated = [currentNode]
    connections = []
    merger = []
    pool = Pool(cpu_count())
    for currentDepth in range(1, depth):
        print('=' * 70)
        print("=  At depth", currentDepth)
        connections = []
        for item in nodesPopulated:
            if item != None:
                if item.isPopulated():
                    connections +=list(item.getConnections().values())
        print("=  Current number of connections:",len(connections))
        print("=  Current number of NodesPopulated in this iteration: ",len(nodesPopulated))
        print("=  Total number of nodes",len(self.nodes.keys()))
        nodesPopulated = pool.map(self.populateTopicNode, connections)
        print('\n=  Successfully populated another round of nodes')
        for node in nodesPopulated:
            if node != None:
                if item.isPopulated():
                    self.nodes[node.getTopic().getName()] = node
                #self.populatedNodes[node.getTopic().getName()] = True;
        print('=  Updated self.nodes\n')

    pool.close()
    pool.join()

    print('\nCount = ',len(list(self.nodes.keys())))
    return

再一次,我确保返回到 nodesPopulated 的所有内容都是可腌制的。我束手无策,因为 运行 这个程序 4 深需要大约 2 个小时,而没有 pool.map 它可以完美运行,但需要大约 6 个小时。我不想放弃多处理,但我无法弄清楚这一点,调试需要很长时间。它在冻结之前打印的最后一件事是 'D',它位于 self.populateTopicNode 的顶部。我还认为对象变得太大(self.nodes 和连接)可能是冻结的原因。

注意: 我确定这是一个多处理问题,因为我 运行 这个确切的代码没有使用 pool.map 并将它替换为 for 循环并且它 运行 完成没有错误。所以有些东西导致 pool.map 冻结。没有错误消息只是在第一次引用函数参数时挂起。这是'populateTopicNode'的前几行:

def populateTopicNode(self, node: TopicNode):
    print('D')
    if(node.isPopulated()):
        return None

冻结前在控制台上看到的最后一件事是 'D'

编辑: 我做了一些测试来给你它挂起的确切数字:

它挂起使用了大约 1300 MB 的内存。

编辑2:

好的,所以我发现它返回的东西不只是悬挂 运行domly。它 returns None 然后挂起。我不确定为什么,因为有很多时候它 returns None 并且工作正常。我还尝试将我的函数包装起来,看看是否将异常返回给父级是否吓坏了,这也不是问题。没有异常被捕获,它是 运行 它 returns 的一个点。它只是在返回后挂起。

编辑 3:

它每次迭代都在相同的确切位置中断。我打印它正在处理的当前主题的名称,它总是在同一行的同一位置中断,然后挂起。我不确定这是否有帮助,但这是附加信息。始终在同一时间中断。

来自 multiprocessing 指南。

As far as possible one should try to avoid shifting large amounts of data between processes.

multiprocessing.Pool 依靠锁定的缓冲区(OS 管道)在工作人员之间分配任务并检索他们的结果。如果将大于缓冲区的对象推入管道,则逻辑可能会挂起。

我建议您将作业转储到文件(例如使用 pickle)并将文件名发送到子进程。这样每个进程都可以独立地检索数据。您不仅可以防止您的逻辑卡住,而且您会注意到速度的提高以及管道成为您设计中的严重瓶颈。

我最近有一个案例需要提高多处理的速度。我设法通过使用基于迭代器的方法并使用 imap 而不是 map 来提高我的速度。 Imap 采用迭代器并迭代迭代器。 Map 实际上首先将第二个参数转换为列表,然后将其传递给它的 worker。这可能是您代码的瓶颈,但我不能百分百确定。

使用 imap 和迭代器至少会节省内存,也许会提高速度,也许还能解决您的崩溃问题。

我提出这样的建议。

from itertools import chain

connections = chain(map(
         lambda item: item.getConnections.values(), 
         filter(lambda item : item != None and item.isPopulated(), nodesPopulated)
         ))

p.imap(self.populateTopicNode, connections, chunksize=1024)

注意:您应该测试我是否真的正确地将连接正确地转换为迭代器。

注2:p.imap实际上是returns一个迭代器。因此,该函数本身是非阻塞的,但是当您在 for 循环中迭代它的值时,它会阻塞(等待输出)。这可能会或可能不会是你想要的,这取决于你的功能可能产生的副作用。否则在循环之前将其转换为列表。

注3:Chunksize 是发送给worker 的每个chunk 的大小。如果这个值太低,就会有太多对新块的请求被发送到主进程,从而造成瓶颈。如果这个值太高,您可以一次将太多内容强加给工人。或者你最终创建了闲置的工人,因为一两个工人仍在忙于他们的工作。如果您使用的函数在所有任务中的结束时间大致相同,则最佳块大小将是任务数量除以工作人员 (CPU) 数量。这样每个 worker 只请求一次块。 Chunksize 也是 p.map 中的一个参数,所以也许它现在已经适用于您的代码。

注 4:还有 imap_unorded 做同样的事情,但是返回的迭代器 returns 每当有东西准备就绪时都会输出,因此它可能是 "unordered"。改用可能会很有趣,但在我的例子中,整个迭代花费了比 imap 更多的时间。

注意5:就像我说的,我不知道你的程序为什么挂起,这种方法有助于加快我的程序,但你的程序可能会挂起,这取决于其他原因。 Noxdafox 方法可能是一个更好的解决方案,因为您确实排除了主要流程作为瓶颈的可能性,但我认为,如果这种方法可行,那将是一个更优雅的解决方案。