multiprocessing.Pool.apply 和 multiprocessing.Pool.apply_async 的用途

Purpose of multiprocessing.Pool.apply and multiprocessing.Pool.apply_async

示例和执行结果如下:

#!/usr/bin/env python3.4
from multiprocessing import Pool
import time
import os

def initializer():
    print("In initializer pid is {} ppid is {}".format(os.getpid(),os.getppid()))

def f(x):
    print("In f pid is {} ppid is {}".format(os.getpid(),os.getppid()))
    return x*x

if __name__ == '__main__':
    print("In main pid is {} ppid is {}".format(os.getpid(), os.getppid()))
    with Pool(processes=4, initializer=initializer) as pool:  # start 4 worker processes
        result = pool.apply(f, (10,)) # evaluate "f(10)" in a single process
        print(result)

        #result = pool.apply_async(f, (10,)) # evaluate "f(10)" in a single process
        #print(result.get())

给出:

$ ./pooleg.py
In main pid is 22783 ppid is 19542
In initializer pid is 22784 ppid is 22783
In initializer pid is 22785 ppid is 22783
In initializer pid is 22787 ppid is 22783
In f pid is 22784 ppid is 22783
In initializer pid is 22786 ppid is 22783
100

从输出中可以清楚地看出:创建了 4 个进程,但实际上只有一个进程完成了工作(称为 f)。

问题:当工作 f 仅由一个进程完成时,为什么我要创建一个超过 1 个工人的池并调用 apply()apply_async() 也是如此,因为在那种情况下,工作也只能由一名工人完成。

我不明白这些功能的用例。

代码中的这一行:

Pool(processes=4, initializer=initializer) as pool:  # start 4 worker processes

启动 4 个工作进程。它只是创建了一个池,而不是一次 运行 同时支持 运行 很多。像 apply() 这样的方法实际上启动了单独的进程 运行.

区别在于apply()apply_async()前者阻塞直到结果准备好,而后者returns立即"result"一个对象。除非您想一次向 Pool 提交多个任务(这当然是使用 multiprocessing 模块的全部意义),否则这没有太大区别。

这里是对您的代码的一些修改,展示了如何使用 Pool:

实际进行一些并发处理
from multiprocessing import Pool
import time
import os

def initializer():
    print("In initializer pid is {} ppid is {}".format(os.getpid(),os.getppid()))

def f(x):
    print("In f pid is {} ppid is {}".format(os.getpid(),os.getppid()))
    return x*x

if __name__ == '__main__':
    print("In main pid is {} ppid is {}".format(os.getpid(), os.getppid()))
    with Pool(processes=4, initializer=initializer) as pool:  # Create 4 worker Pool.
#        result = pool.apply(f, (10,)) # evaluate "f(10)" in a single process
#        print(result)
        # Start multiple tasks.
        tasks = [pool.apply_async(f, (val,)) for val in range(10, 20)]
        pool.close()  # No more tasks.
        pool.join()  # Wait for all tasks to finish.
        results = [result.get() for result in tasks]  # Get the result of each.
        print(results)

map_sync() 更适合处理这样的事情(一系列值),因为它会自动处理上面代码中显示的一些细节。

首先,两者都旨在对参数元组(单个函数调用)进行操作,这与对可迭代对象进行操作的 Pool.map 变体相反。因此,当您仅调用一次这些函数时观察到仅使用一个进程时,这不是错误。


您将使用 Pool.apply_async 而不是 Pool.map 版本之一,在该版本中您需要对要分发的单个任务进行更精细的控制。

Pool.map 版本采用可迭代并将它们分块到任务中,其中每个任务都有 相同(映射)目标函数。 Pool.apply_async 通常不会在 >1 个工作人员池中只调用一次。由于它是异步的,您可以迭代手动预先捆绑的任务并将它们提交给多个 工作进程在它们中的任何一个完成之前。您在此处的任务列表可以包含不同的目标函数,就像您在此答案 . It also allows registering callbacks for results and errors like in 示例中看到的那样。

这些属性使 Pool.apply_async 非常通用,并且是 无法使用 Pool.map 版本之一完成的问题场景的首选工具。


Pool.apply 乍一看(和第二眼)确实没有广泛使用。在这样的场景中,您可以使用它来同步控制流:您首先使用 apply_async 启动多个任务,然后有一个任务必须在您使用 apply_async 启动另一轮任务之前完成。

使用 Pool.apply 也可能只是意味着当您已经有一个当前空闲的池时,您可以为中间任务创建一个额外的进程。