使用多处理池时更新对象的成员变量
Updating member variable of object while using multprocessing pool
我有一个 class B
,它由另一个 class A
组成。
在 class B
中,我正在使用多处理池从 class A
调用方法。此方法更新 A
的成员变量(这是一个字典)。
当我打印出这个成员变量时,它似乎没有被更新。这是描述问题的代码:
import multiprocessing as mp
class A():
def __init__(self):
self.aDict = {'key': 0}
def set_lock(self, lock):
self.lock = lock
def do_work(self, item):
print("Doing work for item: {}".format(item) )
self.aDict['key'] += 1
return [1,2,3] # return some list
class B():
def __init__(self):
self.objA = A()
def run_with_mp(self):
items=['item1', 'item2']
with mp.Pool(processes=mp.cpu_count()) as pool:
result = pool.map_async(self.objA.do_work, items)
result.wait()
pool.terminate()
print(self.objA.aDict)
def run(self):
items=['item1', 'item2']
for item in items:
self.objA.do_work(item)
print(self.objA.aDict)
if __name__ == "__main__":
b = B()
b.run_with_mp() # prints {'key': 0}
b.run() # prints {'key': 2}
b.run_with_mp()
打印 {'key': 0}
整个 b.run()
打印 {'key': 2}
。我认为多处理池版本也会做同样的事情,因为对象 self.objA
具有多处理池运行的 B
的完整 class 范围。
我认为池中的每个工作人员看到的 self.objA
版本不同,这与主程序流程中的版本不同。有没有办法让所有的工人更新一个公共变量?
你接近解释了,的确,每个派生的进程都有自己的内存区域,这意味着它们是独立的。当您 运行 do_work
每个进程都会更新其 aDict
的版本,因为该变量不是共享的。如果要共享一个变量,最简单的方法是使用一个Manager
,例如:
import multiprocessing as mp
class A():
def __init__(self):
self.aDict = mp.Manager().dict({'key': 0})
def set_lock(self, lock):
self.lock = lock
def do_work(self, item):
print("Doing work for item: {}".format(item) )
self.aDict['key'] += 1
return [1,2,3] # return some list
class B():
def __init__(self):
self.objA = A()
def run_with_mp(self):
items=['item1', 'item2']
with mp.Pool(processes=mp.cpu_count()) as pool:
result = pool.map_async(self.objA.do_work, items)
result.wait()
pool.terminate()
print(self.objA.aDict)
def run(self):
items=['item1', 'item2']
for item in items:
self.objA.do_work(item)
print(self.objA.aDict)
if __name__ == "__main__":
b = B()
b.run_with_mp() # prints {'key': 2}
b.run() # prints {'key': 4}
我修改了您的示例以共享 aDict
变量,因此每个进程都会更新 属性(run_with_mp
和 run
方法)。考虑在 docs.
中阅读更多内容
我有一个 class B
,它由另一个 class A
组成。
在 class B
中,我正在使用多处理池从 class A
调用方法。此方法更新 A
的成员变量(这是一个字典)。
当我打印出这个成员变量时,它似乎没有被更新。这是描述问题的代码:
import multiprocessing as mp
class A():
def __init__(self):
self.aDict = {'key': 0}
def set_lock(self, lock):
self.lock = lock
def do_work(self, item):
print("Doing work for item: {}".format(item) )
self.aDict['key'] += 1
return [1,2,3] # return some list
class B():
def __init__(self):
self.objA = A()
def run_with_mp(self):
items=['item1', 'item2']
with mp.Pool(processes=mp.cpu_count()) as pool:
result = pool.map_async(self.objA.do_work, items)
result.wait()
pool.terminate()
print(self.objA.aDict)
def run(self):
items=['item1', 'item2']
for item in items:
self.objA.do_work(item)
print(self.objA.aDict)
if __name__ == "__main__":
b = B()
b.run_with_mp() # prints {'key': 0}
b.run() # prints {'key': 2}
b.run_with_mp()
打印 {'key': 0}
整个 b.run()
打印 {'key': 2}
。我认为多处理池版本也会做同样的事情,因为对象 self.objA
具有多处理池运行的 B
的完整 class 范围。
我认为池中的每个工作人员看到的 self.objA
版本不同,这与主程序流程中的版本不同。有没有办法让所有的工人更新一个公共变量?
你接近解释了,的确,每个派生的进程都有自己的内存区域,这意味着它们是独立的。当您 运行 do_work
每个进程都会更新其 aDict
的版本,因为该变量不是共享的。如果要共享一个变量,最简单的方法是使用一个Manager
,例如:
import multiprocessing as mp
class A():
def __init__(self):
self.aDict = mp.Manager().dict({'key': 0})
def set_lock(self, lock):
self.lock = lock
def do_work(self, item):
print("Doing work for item: {}".format(item) )
self.aDict['key'] += 1
return [1,2,3] # return some list
class B():
def __init__(self):
self.objA = A()
def run_with_mp(self):
items=['item1', 'item2']
with mp.Pool(processes=mp.cpu_count()) as pool:
result = pool.map_async(self.objA.do_work, items)
result.wait()
pool.terminate()
print(self.objA.aDict)
def run(self):
items=['item1', 'item2']
for item in items:
self.objA.do_work(item)
print(self.objA.aDict)
if __name__ == "__main__":
b = B()
b.run_with_mp() # prints {'key': 2}
b.run() # prints {'key': 4}
我修改了您的示例以共享 aDict
变量,因此每个进程都会更新 属性(run_with_mp
和 run
方法)。考虑在 docs.