如何使用 multiprocessing.Manager().Value 来存储总和?

How to use multiprocessing.Manager().Value to store a sum?

我想用 multiprocessing.Pool 累加总和。以下是我的尝试:

import multiprocessing

def add_to_value(addend, value):
    value.value += addend

with multiprocessing.Manager() as manager:
    value = manager.Value(float, 0.0)
    with multiprocessing.Pool(2) as pool:
        pool.starmap(add_to_value,
                     [(float(i), value) for i in range(100)])
    print(value.value)

这会给出不正确甚至不一致的结果。例如,一次给出 2982.0,另一次给出 2927.0。正确的输出是 4950.0,当我在对 Pool 的调用中只使用一个进程而不是 2 个时,我确实得到了这个。我正在使用 Python 3.7.5.

多处理文档(在 multiprocessing.Value 下)对此非常明确:

Operations like += which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do counter.value += 1.

简而言之,你需要抢到一把锁才能做到这一点。

你可以这样做:

def add_to_value(addend, value, lock):
    with lock:
        value.value += addend

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        lock = manager.Lock()
        value = manager.Value(float, 0.0)
        with multiprocessing.Pool(2) as pool:
            pool.starmap(add_to_value,
                         [(float(i), value, lock) for i in range(100)])
        print(value.value)

这将正确输出 4950.0。

但请注意,由于需要锁定,这种方法将非常昂贵。最有可能的是,与使用单个进程执行该操作相比,完成该操作需要更多时间。

注意: 我还添加了一个 if __name__ == '__main__': 守卫,当使用除分叉。 Windows 和 Mac OS 上的默认值是 spawn,因此确实需要让此代码可移植到这些平台中的任何一个。启动方法 spawnforkserver 在 Linux/Unix 上也可用,因此在某些情况下也需要它。

当您能够将工作卸载给他们可以自己完成的工作时,多处理会更有效率,例如计算部分总和,然后在主进程中将它们加在一起。如果可能,请考虑重新考虑适合该模型的方法。