具有多处理锁的共享计数器在 Windows 中不起作用

Shared counter with multiprocessing Lock is not working in Windows

我想在 multiprocessing.Pool 中有一个共享计数器,我使用以下代码打印不同的输入列表:

import multiprocessing

running = multiprocessing.Value('i', 0)

def f(x):
    global running
    global lock

    # ... code ...

    with lock:
        running.value -= 1
        print(f"Still running: {running.value}\n", end='', flush=True)

    return x

if __name__ == '__main__':
    lock = multiprocessing.Lock()

    rangeval = range(100)
    running.value = len(rangeval)

    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    result = pool.map(f, iterable=rangeval)

这在 Mac 和 Linux 中效果很好。但是当我 运行 它在 Windows 它产生一个错误:

  File "C:\...\...\...\...\main.py", line 11, in f
    with lock:
       NameError: name 'lock' is not defined

当我将 lock = multiprocessing.Lock() 放在函数 f 之上的 if __name__ == '__main__' 之外时,它会产生如下奇怪的输出:

Still running: -1
Still running: -2
Still running: -3
Still running: -4
Still running: -1
Still running: -2
Still running: -3
Still running: -4

如何在 Windows 中解决这个问题?

这可以通过调用

使其在 macOS 和 Linux 上不起作用
multiprocessing.set_start_method("spawn", force=True)

(这些操作系统上的默认值可能是 fork。)

您不需要单独的锁; Values have a lock of their own.

您需要克服一些困难才能在子进程初始化时正确地将共享内存值移动到子进程中。 (灵感来自 this answer。)

import multiprocessing

# multiprocessing.set_start_method("spawn", force=True)

running: multiprocessing.Value  # assigned in initproc


def f(x):
    with running.get_lock():
        running.value -= 1
        print(f"Still running: {running.value}\n", end="", flush=True)

    return x


def initproc(r):
    global running
    running = r


def main():
    running = multiprocessing.Value("i", 0)
    rangeval = range(10)
    running.value = len(rangeval)

    with multiprocessing.Pool(
        processes=multiprocessing.cpu_count(), initializer=initproc, initargs=(running,)
    ) as pool:
        pool.map(f, iterable=rangeval)


if __name__ == "__main__":
    main()

multiprocessing.Value 对象需要传递给子进程。这是一个可以帮助您理解其用法的简单示例:

from multiprocessing import Process, Value


def p1(v):
    with v.get_lock():
        v.value += 1


def p2(v):
    with v.get_lock():
        v.value -= 1


if __name__ == '__main__':
    v = Value('i', 0)
    plist = []
    for _ in range(10):
        for p in [p1, p2]:
            _p = Process(target=p, args=[v])
            plist.append(_p)
            _p.start()
    for p in plist:
        p.join()
    assert v.value == 0

这里我们创建一个初始化为零的值对象。我们有两个函数将 运行 作为子进程。 p1 将递增该值,p2 将递减该值。我们 运行 p1 和 p2 各有 10 个(有效的)并发实例。换句话说,该值将增加 10 次并减少 10 次,最终为零