在多处理中维护实例状态 apply_async

Maintain instance state in multiprocessing apply_async

我预计,如果我在实例方法中调用 apply_async 并获得其结果,所做的任何更改都将保留在分叉进程中。但是,似乎每次对 apply_async 的新调用都会创建该实例的新副本。

取以下代码:

from multiprocessing.pool import Pool


class Multitest:
    def __init__(self):
        self.i = 0

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(10):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        self.i += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()

示例输出:

i 0
i 0
i 0
i 0
i 0
input 0
i 0
i 0
i 0
i 0
i 0
input 1
input 2
input 3
input 4
input 5
input 6
input 7
input 8
input 9

但由于我们有两个核心,其中分布着 10 个输入,我曾预计 i 属性会增加。

我曾预料到以下流程:

注意:我不是询问两个进程之间的共享状态。相反,我问的是为什么单个进程的 class 实例没有发生变化(为什么每个进程的 self.i 没有增加)。

但是,我没有看到这种行为。相反,打印输出只有零,表明我的预期是错误的:状态 (属性 i) 没有被维护,但是一个新的实例(或者至少是一个新的副本)在每个致电 apply_async。我在这里缺少什么,我怎样才能按预期进行这项工作? (最好使用 apply_async,虽然不是必需的。但是结果的顺序应该保持不变。)

据我所知,此行为并非特定于 apply_async,也特定于其他 pool 方法。我有兴趣了解 为什么 会发生这种情况以及 如何 可以将行为更改为我想要实现的行为。赏金转到可以为两个查询提供答案的答案。

我想向您指出参考资料,但我还没有,所以我将根据经验证据分享我的想法:

每次调用 apply_async 都会准备命名空间的新副本。您可以通过在进程内部添加对 print(self) 的调用来查看这一点。所以这部分是不正确的:

main thread distributes work ... by initializing two new processes and a copy of the original Multitest instance

相反,有两个新进程和 个原始 Multitest 实例的副本。所有这些副本都是从主进程制作的,它没有增加 i 的副本。为了证明这一点,在调用 apply_async 之前添加 time.sleep(1); self.i += 1,并注意 a) 主线程中 i 的值递增,以及 b) 通过延迟 for 循环,原始 Multitest 实例具有到下一次调用 apply_async 触发新副本时更改。

代码:

from multiprocessing.pool import Pool
import time

class Multitest:
    def __init__(self):
        print("Creating new Multitest instance: {}".format(self))
        self.i = 0

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(4):
                time.sleep(1); self.i += 1
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        print("Copied instance: {}".format(self))
        self.i += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()

结果:

Creating new Multitest instance: <__main__.Multitest object at 0x1056fc8b0>
i 1
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
i 2
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
i 3
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
input 0
input 1
input 2
i 4
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
input 3

关于你的第二个问题,我想如果你想在一个进程中维护状态,你可能只需要提交一个作业。不是 Pool(2) 处理 10 个独立作业,而是让 Pool(2) 处理 2 个独立作业,每个作业由 5 个相互依赖的子作业组成。或者,如果您真的想要 10 个作业,则可以使用由 pid 索引的共享数据结构,这样在单个进程中(按顺序)运行的所有作业都可以操作 i 的单个副本。

这是一个共享数据结构的示例,在模块中以全局形式存在:

from multiprocessing.pool import Pool
from collections import defaultdict
import os
import myglobals # (empty .py file)

myglobals.i = defaultdict(lambda:0)

class Multitest:
    def __init__(self):
        pid = os.getpid()
        print("Creating new Multitest instance: {}".format(self))
        print("i {} (pid: {})".format(myglobals.i[pid], pid))

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(4):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        pid = os.getpid()
        print("Copied instance: {}".format(self))
        print("i {} (pid: {})".format(myglobals.i[pid], pid))
        myglobals.i[pid] += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()

结果:

Creating new Multitest instance: <__main__.Multitest object at 0x1083f3880>
i 0 (pid: 3460)
Copied instance: <__mp_main__.Multitest object at 0x10d89cdf0>
i 0 (pid: 3463)
Copied instance: <__mp_main__.Multitest object at 0x10d89ce50>
Copied instance: <__mp_main__.Multitest object at 0x10550adf0>
i 0 (pid: 3462)
Copied instance: <__mp_main__.Multitest object at 0x10550ae50>
i 1 (pid: 3462)
i 1 (pid: 3463)
input 0
input 1
input 2
input 3

此技术来自

多进程和线程之间的一个区别是,在创建进程后,它使用的内存实际上是从它的父进程中克隆的,因此进程之间没有共享内存。

这是一个例子:

import os
import time
from threading import Thread

global_counter = 0

def my_thread():
    global global_counter
    print("in thread, global_counter is %r, add one." % global_counter)
    global_counter += 1

def test_thread():
    global global_counter
    th = Thread(target=my_thread)
    th.start()
    th.join()
    print("in parent, child thread joined, global_counter is %r now." % global_counter)

def test_fork():
    global global_counter
    pid = os.fork()
    if pid == 0:
        print("in child process, global_counter is %r, add one." % global_counter)
        global_counter += 1
        exit()
    time.sleep(1)
    print("in parent, child process died, global_counter is still %r." % global_counter)

def main():
    test_thread()
    test_fork()

if __name__ == "__main__":
    main()

输出:

in thread, global_counter is 0, add one.
in parent, child thread joined, global_counter is 1 now.
in child process, global_counter is 1, add one.
in parent, child process died, global_counter is still 1.

你的情况:

for j in range(10):
    # Before fork, self.i is 0, fork() dups memory, so the variable is not shared to the child.
    job = pool.apply_async(self.process, (j,))
    # After job finishes, child's self.i is 1 (not parent's), this variable is freed after child dies.
    worker_jobs.append(job)

编辑:

在python3 pickling 中,绑定方法也将包括对象本身,本质上是复制它。因此,每次 apply_async 被调用时,对象 self 也会被 pickled。

import os
from multiprocessing.pool import Pool
import pickle

class Multitest:
    def __init__(self):
        self.i = "myattr"

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(10):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        self.i += "|append"

        return inp

def test_pickle():
    m = Multitest()
    print("original instance is %r" % m)

    pickled_method = pickle.dumps(m.process)
    assert b"myattr" in pickled_method

    unpickled_method = pickle.loads(pickled_method)
    # get instance from it's method (python 3)
    print("pickle duplicates the instance, new instance is %r" % unpickled_method.__self__)

if __name__ == '__main__':
    test_pickle()

输出:

original instance is <__main__.Multitest object at 0x1072828d0>
pickle duplicates the instance, new instance is <__main__.Multitest object at 0x107283110>

我认为发生了以下情况:

  1. 每次调用self.process时,方法都会被序列化(pickled)并发送给子进程。每次创建一个新副本。
  2. 该方法在子进程中运行,但由于它是单独副本的一部分,与父进程中的原始副本不同,其更改后的状态不会也不可能影响父进程。传回的唯一信息是 return 值(也是 pickled)。

请注意,子进程没有自己的 Multitest 实例,因为它仅在 __name__ == '__main__' 时创建,不适用于池创建的分叉。

如果你想在子进程中保持状态,你可以用全局变量来实现。您可以在创建池时传递初始化参数来初始化此类变量。

以下显示了您想要的工作版本(但没有 OOP,它不适用于多处理):

from multiprocessing.pool import Pool


def initialize():
    global I
    I = 0


def process(inp):
    global I
    print("I", I)
    I += 1
    return inp


if __name__ == '__main__':
    with Pool(2, initializer=initialize) as pool:
        worker_jobs = []
        for j in range(10):
            job = pool.apply_async(process, (j,))
            worker_jobs.append(job)

        for job in worker_jobs:
            res = job.get()
            print("input", res)