使用多处理池时更新对象的成员变量

Updating member variable of object while using multprocessing pool

我有一个 class B,它由另一个 class A 组成。

在 class B 中,我正在使用多处理池从 class A 调用方法。此方法更新 A 的成员变量(这是一个字典)。

当我打印出这个成员变量时,它似乎没有被更新。这是描述问题的代码:

import multiprocessing as mp

class A():
    def __init__(self):
        self.aDict = {'key': 0}

    def set_lock(self, lock):
        self.lock = lock

    def do_work(self, item):
        print("Doing work for item: {}".format(item) )
        self.aDict['key'] += 1

        return [1,2,3] # return some list

class B():
    def __init__(self):
        self.objA = A()

    def run_with_mp(self):
        items=['item1', 'item2']
        with mp.Pool(processes=mp.cpu_count()) as pool:
            result = pool.map_async(self.objA.do_work, items)
            result.wait()
            pool.terminate()

        print(self.objA.aDict)

    def run(self):
        items=['item1', 'item2']
        for item in items:
            self.objA.do_work(item)

        print(self.objA.aDict)

if __name__ == "__main__":
    b = B()
    b.run_with_mp() # prints {'key': 0}
    b.run()         # prints {'key': 2}

b.run_with_mp() 打印 {'key': 0} 整个 b.run() 打印 {'key': 2}。我认为多处理池版本也会做同样的事情,因为对象 self.objA 具有多处理池运行的 B 的完整 class 范围。

我认为池中的每个工作人员看到的 self.objA 版本不同,这与主程序流程中的版本不同。有没有办法让所有的工人更新一个公共变量?

你接近解释了,的确,每个派生的进程都有自己的内存区域,这意味着它们是独立的。当您 运行 do_work 每个进程都会更新其 aDict 的版本,因为该变量不是共享的。如果要共享一个变量,最简单的方法是使用一个Manager,例如:

import multiprocessing as mp

class A():
    def __init__(self):
        self.aDict = mp.Manager().dict({'key': 0})

    def set_lock(self, lock):
        self.lock = lock

    def do_work(self, item):
        print("Doing work for item: {}".format(item) )
        self.aDict['key'] += 1

        return [1,2,3] # return some list

class B():
    def __init__(self):
        self.objA = A()

    def run_with_mp(self):
        items=['item1', 'item2']
        with mp.Pool(processes=mp.cpu_count()) as pool:
            result = pool.map_async(self.objA.do_work, items)
            result.wait()
            pool.terminate()

        print(self.objA.aDict)

    def run(self):
        items=['item1', 'item2']
        for item in items:
            self.objA.do_work(item)

        print(self.objA.aDict)

if __name__ == "__main__":
    b = B()
    b.run_with_mp() # prints {'key': 2}
    b.run()         # prints {'key': 4}

我修改了您的示例以共享 aDict 变量,因此每个进程都会更新 属性(run_with_mprun 方法)。考虑在 docs.

中阅读更多内容