pool.map() 如何在内部分配工作?

How pool.map() allocates the work internally?

我对 multiprocessing 库还很陌生,当与 map() 一起使用时,我对它的 Pool 模块有疑问。假设我有 4 个工作线程和 6 个任务要完成。我所做的是(使用 multiprocessing.dummy 因为我想生成线程而不是进程)

from multiprocessing.dummy import Pool as ThreadPool

def print_it(num):
    print num

def multi_threaded():
    tasks = [1, 2, 3, 4, 5, 6]
    pool = ThreadPool(4)
    r = pool.map(print_it, tasks)
    pool.close()
    pool.join()

multi_threaded()

我想了解 Pool.map() 是如何处理任务的?三个选项:

  1. 它是否首先生成 4 个线程,完成前 4 个任务并让线程死亡。然后为剩余的任务生成 2 个新线程?
  2. 它是否生成 4 个线程,为它们分配 4 个任务,一旦某个线程完成其任务,就将新任务分配给同一个线程。
  3. 其他方式。

这种见解会很有帮助,因为它会帮助我考虑在产品中更有效地使用 Pool.map()

这取决于您如何定义池。

正如您在示例中所做的那样,您的 (2) 发生了。初始化 Pool 后,依赖于 Pool 的线程或进程就会启动(发生在 Pool__init__() - 无需为此提交任务),它们就坐在那里等待任务。当一个任务到达并被执行时,线程或进程不会退出,它们只是回到等待状态等待更多的工作到来。

不过,您可以将其定义为不同的工作方式。您可以将 maxtasksperchild 参数添加到池中。一旦一个worker完成了这个数量的任务,它就会退出,并立即启动一个新的worker(不需要先给它一个任务,一个worker一退出就启动)。这是在 Pool class Pool._maintain_pool()Pool._repopulate_pool() 函数中管理的。

如果您希望您的工作人员在一开始就启动并且 运行 无限期地启动,请按照您现在所做的去做,这就是发生的事情。如果您希望您的工作人员在开始时启动但在完成一系列任务(如果需要甚至一个)后退出并自行更新,请使用 maxtasksperchild。如果您不想在需要之前启动进程或线程,请不要使用 Pool。在需要时启动线程或进程并自行管理。

希望这对您有所帮助。