了解一个简单的多处理脚本

Understanding a simple multiprocessing script

我想了解 Python 的多处理模块是如何工作的。为此,我制作了一个非常简单的代码版本,并尝试使其并行运行。根据我的阅读,使用 pool 比使用 mp.Process 更适合我的程序。

下面是我想出的:

import time, os
import multiprocessing as mp

class Foo:
    def __init__(self, ID):
        self.ID = ID

    def showID(self):
        for k in range(0,4):
            print('Foo #', self.ID, '\tID:', os.getpid(), '\tParent ID:', os.getppid())
            time.sleep(0.2)

# MAIN
if __name__ == '__main__':

    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print(' ')

    foos = [Foo(2), Foo(3)]

    pool = mp.Pool(processes=2)

    # Code below doesn't work
    pool.apply_async(foos[0].showID, ())
    pool.apply_async(foos[1].showID, ())

列表foos最终将包含 10 到 20 个对象。 Foo.showID 方法最终也会 return 一些东西。我的目标是在需要 运行 的时候发送尽可能多的任务(foos 成员),这样他们就可以被分派到 pool 的进程之一。

如果我运行上面的代码,没有任何反应,即。只显示开头的parent processprocess id。如果我将最后两行替换为:

pool.apply_async(foos[0].showID())
pool.apply_async(foos[1].showID())

两者都在主进程中依次执行:

parent process: 3380
process id: 6556

Foo # 2         ID: 6556        Parent ID: 3380
Foo # 2         ID: 6556        Parent ID: 3380
Foo # 2         ID: 6556        Parent ID: 3380
Foo # 2         ID: 6556        Parent ID: 3380
Foo # 3         ID: 6556        Parent ID: 3380
Foo # 3         ID: 6556        Parent ID: 3380
Foo # 3         ID: 6556        Parent ID: 3380
Foo # 3         ID: 6556        Parent ID: 3380

最后,如果我用这样的东西替换它们:

pool.apply_async(foos[0].showID, ())
pool.apply_async(foos[1].showID())

我得到了预期的行为(我认为):

parent process: 3380
process id: 4772

Foo # 3         ID: 4772        Parent ID: 3380
Foo # 2         ID: 6364        Parent ID: 4772
Foo # 3         ID: 4772        Parent ID: 3380
Foo # 2         ID: 6364        Parent ID: 4772
Foo # 3         ID: 4772        Parent ID: 3380
Foo # 2         ID: 6364        Parent ID: 4772
Foo # 3         ID: 4772        Parent ID: 3380
Foo # 2         ID: 6364        Parent ID: 4772

这里发生了什么?如果我尝试使用未在 Foo class.

中定义的函数,我会注意到相同的行为

apply_async接收函数

当你使用不带括号的 foos[0].showID 时,你是在传递函数,而不是调用它,但是在执行

pool.apply_async(foos[0].showID())

您首先评估 foos[0].showID(),然后将其 return 值作为参数传递给 apply_async。最终进行评估的是 apply_async 的调用者,这是同步处理。

相当于做:

foos[0].showID()
pool.apply_async()
foos[1].showID()
pool.apply_async()

您的第一次尝试失败,因为您没有等待异步调用执行。打电话后.

pool.apply_async(foos[0].showID, ())
pool.apply_async(foos[1].showID, ())

您的程序已退出,因此您无需等待输出。

终于

pool.apply_async(foos[0].showID, ())
pool.apply_async(foos[1].showID())

相当于:

pool.apply_async(foos[0].showID, ())
foos[1].showID()

进行一次异步调用和一次同步调用,所以它有点工作。

您原始代码的问题是您没有等待子进程完成工作。您安排了工作项并退出导致池杀死了工人。保留计划作业时返回的结果对象并等待它们完成。

import time, os
import multiprocessing as mp

class Foo:
    def __init__(self, ID):
        self.ID = ID

    def showID(self):
        for k in range(0,4):
            print('Foo #', self.ID, '\tID:', os.getpid(), '\tParent ID:', os.getppid())
            time.sleep(0.2)

# MAIN
if __name__ == '__main__':

    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print(' ')

    foos = [Foo(2), Foo(3)]

    pool = mp.Pool(processes=2)

    results = []
    results.append(pool.apply_async(foos[0].showID, ()))
    results.append(pool.apply_async(foos[1].showID, ()))

    for result in results:
        result.wait()