python 多处理 pool.map() 等到方法完成

python multiprocessing pool.map() wait until method is done

我是 python 多处理的新手,在正式使用之前进行了一些试验。现在我正在使用 pool.map() ,它似乎工作得很好。但我想在继续我的程序的其余部分之前完成池作业:

import time
import multiprocessing as mp

res1 = []

def my_func(x):
    print(mp.current_process())
    res1.append(x**x)

def main():
    print(mp.cpu_count())
    pool = mp.Pool(mp.cpu_count())
    tic = time.time()
    pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222])
    toc = time.time()
    t = toc-tic
    print(t)

if __name__ == "__main__":
    main()

print('Hi')

现在在我的进程之间打印了“hi”,但我想让计算进程先完成,最后应该打印“hi”。

我尝试了 pool.close() 和 pool.join() 但在 my_func() 内部它似乎没有任何改变,在 main() 中说,它并没有改变外部知道我的池对象(当然因为它是在 main() 内部声明的)

我知道这只是一个测试程序,但我的论文中需要这个概念用于更大的项目。所以我很感激我能得到的每一个帮助。非常感谢!

几件事:

首先,您传递的 x**xx 的值是一个 非常大 的数字,需要相当长的时间来计算。在我有 8 个逻辑处理器 (cpu_count() returns 8) 的桌面上,map 函数需要 99 秒才能完成——但它确实完成了。

其次,每个进程附加到的全局变量 res1 对每个进程 是唯一的 。也就是说,每个进程在其自己的地址 space 中是 运行(这是多处理的 属性),因此有自己的 res1 副本,这就是为什么调用 map 后 return,主进程的 res1 将为空。相反,您的 工作函数 ,即 my_func,其结果应该 return,然后来自 map 的 return 值将是一个列表在所有 return 个值中。

第三,这是 属性 如何在 Windows 下创建进程,任何不在 if __name__ = "__main__": 块内的全局范围内的语句将由每个新执行在初始化时创建进程,这就是为什么您立即看到 Hi 打印 cpu_count() 次的原因。您应该在全局范围内删除对 print('Hi') 的调用,并将其放在 my_func 中(参见最后一个代码示例)。

因为人生苦短,我把my_func改成了return它的参数的平方:

import time
import multiprocessing as mp

def my_func(x):
    print(mp.current_process())
    return x * x

def main():
    print(mp.cpu_count())
    pool = mp.Pool(mp.cpu_count())
    tic = time.time()
    results = pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222])
    print('Hi')
    toc = time.time()
    t = toc-tic
    print(t)
    print(results)

if __name__ == "__main__":
    main()

打印:

8
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-1' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-1' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-1' parent=3232 started daemon>
Hi
0.17300081253051758
[19753082469136, 4938270617284, 11111108888889, 30864191358025, 11111108888889, 4938270617284, 1234567654321, 4938270617284]

您会注意到,尽管池大小为 8,但其中 7 个结果是由池中的单个进程处理的。其原因如下。要处理的 8 个“任务”,由传递给 map 方法的 iterable 中的 8 个数字表示,被放置在一个输入队列中,以“块”的形式处理”的一定规模。如果你没有指定 chunksize 参数,它默认为 None 并且 map 方法将根据 iterable 和池的大小计算一个合适的值尺寸。在这种情况下,将使用 chunksize 值 1。空闲进程将从输入队列中获取下一个任务块并执行该块。在这种情况下,池中的一个进程抓取一个块并快速执行它,以至于它能够返回并抓取下一个块并在池中的任何其他进程被分派到另一个处理器之前执行它。事实上,它还能再这样做 6 次。

通过调用 time.sleep 并确保使用 chunksize 值为 1 让 my_func 变得更有趣,我们确保我们给每个处理器一个获取任务的机会:

import time
import multiprocessing as mp
import time

def my_func(x):
    time.sleep(.1)
    print(mp.current_process())
    return x * x

def main():
    print(mp.cpu_count())
    pool = mp.Pool(mp.cpu_count())
    tic = time.time()
    results = pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222], chunksize=1)
    toc = time.time()
    t = toc-tic
    print(t)
    print(results)

if __name__ == "__main__":
    main()

print('Hi')

打印:

8
Hi
Hi
Hi
Hi
Hi
Hi
Hi
Hi
<SpawnProcess name='SpawnPoolWorker-1' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-4' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-2' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-6' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-7' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-5' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-8' parent=3116 started daemon>
0.28499770164489746
[19753082469136, 4938270617284, 11111108888889, 30864191358025, 11111108888889, 4938270617284, 1234567654321, 4938270617284]
Hi

Simalry,通过指定 chunksize=8,我们保证所有 8 个任务将由一个进程处理,而其他 7 个空闲:

import time
import multiprocessing as mp
import time

def my_func(x):
    time.sleep(.1)
    print(mp.current_process())
    print('Hi')
    return x * x

def main():
    print(mp.cpu_count())
    pool = mp.Pool(mp.cpu_count())
    tic = time.time()
    results = pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222], chunksize=8)
    toc = time.time()
    t = toc-tic
    print(t)
    print(results)

if __name__ == "__main__":
    main()

打印:

8
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
0.9750022888183594
[19753082469136, 4938270617284, 11111108888889, 30864191358025, 11111108888889, 4938270617284, 1234567654321, 4938270617284]

不用说,这是一个糟糕的 chunksize 值,用于此 iterable/pool 大小组合。