在多处理中维护实例状态 apply_async
Maintain instance state in multiprocessing apply_async
我预计,如果我在实例方法中调用 apply_async
并获得其结果,所做的任何更改都将保留在分叉进程中。但是,似乎每次对 apply_async 的新调用都会创建该实例的新副本。
取以下代码:
from multiprocessing.pool import Pool
class Multitest:
def __init__(self):
self.i = 0
def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(10):
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
def process(self, inp):
print("i", self.i)
self.i += 1
return inp
if __name__ == '__main__':
mt = Multitest()
mt.run()
示例输出:
i 0
i 0
i 0
i 0
i 0
input 0
i 0
i 0
i 0
i 0
i 0
input 1
input 2
input 3
input 4
input 5
input 6
input 7
input 8
input 9
但由于我们有两个核心,其中分布着 10 个输入,我曾预计 i
属性会增加。
我曾预料到以下流程:
- 主线程创建实例并调用
run()
- 主线程通过初始化两个新进程和原始 Multitest 实例的副本(其中
i = 0
)在池上分配 apply_async
的工作
process()
在新进程上多次调用(直到 range()
耗尽)。在每次调用进程时,该进程的 self.i
递增
注意:我不是询问两个进程之间的共享状态。相反,我问的是为什么单个进程的 class 实例没有发生变化(为什么每个进程的 self.i
没有增加)。
但是,我没有看到这种行为。相反,打印输出只有零,表明我的预期是错误的:状态 (属性 i
) 没有被维护,但是一个新的实例(或者至少是一个新的副本)在每个致电 apply_async
。我在这里缺少什么,我怎样才能按预期进行这项工作? (最好使用 apply_async
,虽然不是必需的。但是结果的顺序应该保持不变。)
据我所知,此行为并非特定于 apply_async
,也特定于其他 pool
方法。我有兴趣了解 为什么 会发生这种情况以及 如何 可以将行为更改为我想要实现的行为。赏金转到可以为两个查询提供答案的答案。
我想向您指出参考资料,但我还没有,所以我将根据经验证据分享我的想法:
每次调用 apply_async 都会准备命名空间的新副本。您可以通过在进程内部添加对 print(self)
的调用来查看这一点。所以这部分是不正确的:
main thread distributes work ... by initializing two new processes and
a copy of the original Multitest instance
相反,有两个新进程和 十 个原始 Multitest 实例的副本。所有这些副本都是从主进程制作的,它没有增加 i 的副本。为了证明这一点,在调用 apply_async 之前添加 time.sleep(1); self.i += 1
,并注意 a) 主线程中 i 的值递增,以及 b) 通过延迟 for 循环,原始 Multitest 实例具有到下一次调用 apply_async 触发新副本时更改。
代码:
from multiprocessing.pool import Pool
import time
class Multitest:
def __init__(self):
print("Creating new Multitest instance: {}".format(self))
self.i = 0
def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(4):
time.sleep(1); self.i += 1
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
def process(self, inp):
print("i", self.i)
print("Copied instance: {}".format(self))
self.i += 1
return inp
if __name__ == '__main__':
mt = Multitest()
mt.run()
结果:
Creating new Multitest instance: <__main__.Multitest object at 0x1056fc8b0>
i 1
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
i 2
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
i 3
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
input 0
input 1
input 2
i 4
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
input 3
关于你的第二个问题,我想如果你想在一个进程中维护状态,你可能只需要提交一个作业。不是 Pool(2) 处理 10 个独立作业,而是让 Pool(2) 处理 2 个独立作业,每个作业由 5 个相互依赖的子作业组成。或者,如果您真的想要 10 个作业,则可以使用由 pid 索引的共享数据结构,这样在单个进程中(按顺序)运行的所有作业都可以操作 i 的单个副本。
这是一个共享数据结构的示例,在模块中以全局形式存在:
from multiprocessing.pool import Pool
from collections import defaultdict
import os
import myglobals # (empty .py file)
myglobals.i = defaultdict(lambda:0)
class Multitest:
def __init__(self):
pid = os.getpid()
print("Creating new Multitest instance: {}".format(self))
print("i {} (pid: {})".format(myglobals.i[pid], pid))
def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(4):
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
def process(self, inp):
pid = os.getpid()
print("Copied instance: {}".format(self))
print("i {} (pid: {})".format(myglobals.i[pid], pid))
myglobals.i[pid] += 1
return inp
if __name__ == '__main__':
mt = Multitest()
mt.run()
结果:
Creating new Multitest instance: <__main__.Multitest object at 0x1083f3880>
i 0 (pid: 3460)
Copied instance: <__mp_main__.Multitest object at 0x10d89cdf0>
i 0 (pid: 3463)
Copied instance: <__mp_main__.Multitest object at 0x10d89ce50>
Copied instance: <__mp_main__.Multitest object at 0x10550adf0>
i 0 (pid: 3462)
Copied instance: <__mp_main__.Multitest object at 0x10550ae50>
i 1 (pid: 3462)
i 1 (pid: 3463)
input 0
input 1
input 2
input 3
此技术来自
多进程和线程之间的一个区别是,在创建进程后,它使用的内存实际上是从它的父进程中克隆的,因此进程之间没有共享内存。
这是一个例子:
import os
import time
from threading import Thread
global_counter = 0
def my_thread():
global global_counter
print("in thread, global_counter is %r, add one." % global_counter)
global_counter += 1
def test_thread():
global global_counter
th = Thread(target=my_thread)
th.start()
th.join()
print("in parent, child thread joined, global_counter is %r now." % global_counter)
def test_fork():
global global_counter
pid = os.fork()
if pid == 0:
print("in child process, global_counter is %r, add one." % global_counter)
global_counter += 1
exit()
time.sleep(1)
print("in parent, child process died, global_counter is still %r." % global_counter)
def main():
test_thread()
test_fork()
if __name__ == "__main__":
main()
输出:
in thread, global_counter is 0, add one.
in parent, child thread joined, global_counter is 1 now.
in child process, global_counter is 1, add one.
in parent, child process died, global_counter is still 1.
你的情况:
for j in range(10):
# Before fork, self.i is 0, fork() dups memory, so the variable is not shared to the child.
job = pool.apply_async(self.process, (j,))
# After job finishes, child's self.i is 1 (not parent's), this variable is freed after child dies.
worker_jobs.append(job)
编辑:
在python3 pickling 中,绑定方法也将包括对象本身,本质上是复制它。因此,每次 apply_async
被调用时,对象 self
也会被 pickled。
import os
from multiprocessing.pool import Pool
import pickle
class Multitest:
def __init__(self):
self.i = "myattr"
def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(10):
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
def process(self, inp):
print("i", self.i)
self.i += "|append"
return inp
def test_pickle():
m = Multitest()
print("original instance is %r" % m)
pickled_method = pickle.dumps(m.process)
assert b"myattr" in pickled_method
unpickled_method = pickle.loads(pickled_method)
# get instance from it's method (python 3)
print("pickle duplicates the instance, new instance is %r" % unpickled_method.__self__)
if __name__ == '__main__':
test_pickle()
输出:
original instance is <__main__.Multitest object at 0x1072828d0>
pickle duplicates the instance, new instance is <__main__.Multitest object at 0x107283110>
我认为发生了以下情况:
- 每次调用
self.process
时,方法都会被序列化(pickled)并发送给子进程。每次创建一个新副本。
- 该方法在子进程中运行,但由于它是单独副本的一部分,与父进程中的原始副本不同,其更改后的状态不会也不可能影响父进程。传回的唯一信息是 return 值(也是 pickled)。
请注意,子进程没有自己的 Multitest
实例,因为它仅在 __name__ == '__main__'
时创建,不适用于池创建的分叉。
如果你想在子进程中保持状态,你可以用全局变量来实现。您可以在创建池时传递初始化参数来初始化此类变量。
以下显示了您想要的工作版本(但没有 OOP,它不适用于多处理):
from multiprocessing.pool import Pool
def initialize():
global I
I = 0
def process(inp):
global I
print("I", I)
I += 1
return inp
if __name__ == '__main__':
with Pool(2, initializer=initialize) as pool:
worker_jobs = []
for j in range(10):
job = pool.apply_async(process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
我预计,如果我在实例方法中调用 apply_async
并获得其结果,所做的任何更改都将保留在分叉进程中。但是,似乎每次对 apply_async 的新调用都会创建该实例的新副本。
取以下代码:
from multiprocessing.pool import Pool
class Multitest:
def __init__(self):
self.i = 0
def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(10):
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
def process(self, inp):
print("i", self.i)
self.i += 1
return inp
if __name__ == '__main__':
mt = Multitest()
mt.run()
示例输出:
i 0
i 0
i 0
i 0
i 0
input 0
i 0
i 0
i 0
i 0
i 0
input 1
input 2
input 3
input 4
input 5
input 6
input 7
input 8
input 9
但由于我们有两个核心,其中分布着 10 个输入,我曾预计 i
属性会增加。
我曾预料到以下流程:
- 主线程创建实例并调用
run()
- 主线程通过初始化两个新进程和原始 Multitest 实例的副本(其中
i = 0
)在池上分配apply_async
的工作 process()
在新进程上多次调用(直到range()
耗尽)。在每次调用进程时,该进程的self.i
递增
注意:我不是询问两个进程之间的共享状态。相反,我问的是为什么单个进程的 class 实例没有发生变化(为什么每个进程的 self.i
没有增加)。
但是,我没有看到这种行为。相反,打印输出只有零,表明我的预期是错误的:状态 (属性 i
) 没有被维护,但是一个新的实例(或者至少是一个新的副本)在每个致电 apply_async
。我在这里缺少什么,我怎样才能按预期进行这项工作? (最好使用 apply_async
,虽然不是必需的。但是结果的顺序应该保持不变。)
据我所知,此行为并非特定于 apply_async
,也特定于其他 pool
方法。我有兴趣了解 为什么 会发生这种情况以及 如何 可以将行为更改为我想要实现的行为。赏金转到可以为两个查询提供答案的答案。
我想向您指出参考资料,但我还没有,所以我将根据经验证据分享我的想法:
每次调用 apply_async 都会准备命名空间的新副本。您可以通过在进程内部添加对 print(self)
的调用来查看这一点。所以这部分是不正确的:
main thread distributes work ... by initializing two new processes and a copy of the original Multitest instance
相反,有两个新进程和 十 个原始 Multitest 实例的副本。所有这些副本都是从主进程制作的,它没有增加 i 的副本。为了证明这一点,在调用 apply_async 之前添加 time.sleep(1); self.i += 1
,并注意 a) 主线程中 i 的值递增,以及 b) 通过延迟 for 循环,原始 Multitest 实例具有到下一次调用 apply_async 触发新副本时更改。
代码:
from multiprocessing.pool import Pool
import time
class Multitest:
def __init__(self):
print("Creating new Multitest instance: {}".format(self))
self.i = 0
def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(4):
time.sleep(1); self.i += 1
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
def process(self, inp):
print("i", self.i)
print("Copied instance: {}".format(self))
self.i += 1
return inp
if __name__ == '__main__':
mt = Multitest()
mt.run()
结果:
Creating new Multitest instance: <__main__.Multitest object at 0x1056fc8b0>
i 1
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
i 2
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
i 3
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
input 0
input 1
input 2
i 4
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
input 3
关于你的第二个问题,我想如果你想在一个进程中维护状态,你可能只需要提交一个作业。不是 Pool(2) 处理 10 个独立作业,而是让 Pool(2) 处理 2 个独立作业,每个作业由 5 个相互依赖的子作业组成。或者,如果您真的想要 10 个作业,则可以使用由 pid 索引的共享数据结构,这样在单个进程中(按顺序)运行的所有作业都可以操作 i 的单个副本。
这是一个共享数据结构的示例,在模块中以全局形式存在:
from multiprocessing.pool import Pool
from collections import defaultdict
import os
import myglobals # (empty .py file)
myglobals.i = defaultdict(lambda:0)
class Multitest:
def __init__(self):
pid = os.getpid()
print("Creating new Multitest instance: {}".format(self))
print("i {} (pid: {})".format(myglobals.i[pid], pid))
def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(4):
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
def process(self, inp):
pid = os.getpid()
print("Copied instance: {}".format(self))
print("i {} (pid: {})".format(myglobals.i[pid], pid))
myglobals.i[pid] += 1
return inp
if __name__ == '__main__':
mt = Multitest()
mt.run()
结果:
Creating new Multitest instance: <__main__.Multitest object at 0x1083f3880>
i 0 (pid: 3460)
Copied instance: <__mp_main__.Multitest object at 0x10d89cdf0>
i 0 (pid: 3463)
Copied instance: <__mp_main__.Multitest object at 0x10d89ce50>
Copied instance: <__mp_main__.Multitest object at 0x10550adf0>
i 0 (pid: 3462)
Copied instance: <__mp_main__.Multitest object at 0x10550ae50>
i 1 (pid: 3462)
i 1 (pid: 3463)
input 0
input 1
input 2
input 3
此技术来自
多进程和线程之间的一个区别是,在创建进程后,它使用的内存实际上是从它的父进程中克隆的,因此进程之间没有共享内存。
这是一个例子:
import os
import time
from threading import Thread
global_counter = 0
def my_thread():
global global_counter
print("in thread, global_counter is %r, add one." % global_counter)
global_counter += 1
def test_thread():
global global_counter
th = Thread(target=my_thread)
th.start()
th.join()
print("in parent, child thread joined, global_counter is %r now." % global_counter)
def test_fork():
global global_counter
pid = os.fork()
if pid == 0:
print("in child process, global_counter is %r, add one." % global_counter)
global_counter += 1
exit()
time.sleep(1)
print("in parent, child process died, global_counter is still %r." % global_counter)
def main():
test_thread()
test_fork()
if __name__ == "__main__":
main()
输出:
in thread, global_counter is 0, add one.
in parent, child thread joined, global_counter is 1 now.
in child process, global_counter is 1, add one.
in parent, child process died, global_counter is still 1.
你的情况:
for j in range(10):
# Before fork, self.i is 0, fork() dups memory, so the variable is not shared to the child.
job = pool.apply_async(self.process, (j,))
# After job finishes, child's self.i is 1 (not parent's), this variable is freed after child dies.
worker_jobs.append(job)
编辑:
在python3 pickling 中,绑定方法也将包括对象本身,本质上是复制它。因此,每次 apply_async
被调用时,对象 self
也会被 pickled。
import os
from multiprocessing.pool import Pool
import pickle
class Multitest:
def __init__(self):
self.i = "myattr"
def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(10):
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)
def process(self, inp):
print("i", self.i)
self.i += "|append"
return inp
def test_pickle():
m = Multitest()
print("original instance is %r" % m)
pickled_method = pickle.dumps(m.process)
assert b"myattr" in pickled_method
unpickled_method = pickle.loads(pickled_method)
# get instance from it's method (python 3)
print("pickle duplicates the instance, new instance is %r" % unpickled_method.__self__)
if __name__ == '__main__':
test_pickle()
输出:
original instance is <__main__.Multitest object at 0x1072828d0>
pickle duplicates the instance, new instance is <__main__.Multitest object at 0x107283110>
我认为发生了以下情况:
- 每次调用
self.process
时,方法都会被序列化(pickled)并发送给子进程。每次创建一个新副本。 - 该方法在子进程中运行,但由于它是单独副本的一部分,与父进程中的原始副本不同,其更改后的状态不会也不可能影响父进程。传回的唯一信息是 return 值(也是 pickled)。
请注意,子进程没有自己的 Multitest
实例,因为它仅在 __name__ == '__main__'
时创建,不适用于池创建的分叉。
如果你想在子进程中保持状态,你可以用全局变量来实现。您可以在创建池时传递初始化参数来初始化此类变量。
以下显示了您想要的工作版本(但没有 OOP,它不适用于多处理):
from multiprocessing.pool import Pool
def initialize():
global I
I = 0
def process(inp):
global I
print("I", I)
I += 1
return inp
if __name__ == '__main__':
with Pool(2, initializer=initialize) as pool:
worker_jobs = []
for j in range(10):
job = pool.apply_async(process, (j,))
worker_jobs.append(job)
for job in worker_jobs:
res = job.get()
print("input", res)