Python multiprocessing.Queue 没有收到来自分叉进程的 put

Python multiprocessing.Queue not receiving puts from forked processes

我正在创建固定数量的分叉子进程,并试图通过 multiprocessing.Queue 使它们成为 return 结果。这会导致一些意外行为。

import multiprocessing
import os

def main():
    n_workers = 4

    q = multiprocessing.Queue(n_workers)

    for i in range(n_workers):
        if os.fork() == 0:
            print(f"child {i} put {i}")
            q.put(i)
            print(f"child {i} exiting")
            os._exit(0)

    for i in range(n_workers):
        res = q.get()
        print(f"parent got {res}")

    print("parent exiting")


if __name__ == "__main__":
    main()

当我 运行 执行此操作时,所有子进程都将其结果排入队列并终止,但父进程挂起:

child 0 put 0                                                                              │
child 1 put 1                                                                              │
child 2 put 2                                                                              │
child 3 put 3                                                                              │
child 0 exiting                                                                            │
child 1 exiting                                                                            │
child 2 exiting                                                                            │
child 3 exiting                                                                            │
parent got 0

问题是 os._exit(0) 将数据放入队列后立即调用。

multiprocessing docs解释了数据是如何添加到队列中的:

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.

因为进程是分叉的,所以需要调用 os._exit(0)(与 sys.exit(0) 相对),但它 does not do any cleanup。如果后台线程还没有刷新数据,就会丢失!

解决方法是调用 close() 然后调用 join_thread():

import multiprocessing
import os

def main():
    n_workers = 4

    q = multiprocessing.Queue(n_workers)

    for i in range(n_workers):
        if os.fork() == 0:
            print(f"child {i} put {i}")
            q.put(i)
            print(f"child {i} exiting")

            q.close()  # indicate nothing else will be queued by this process
            q.join_thread()  # wait for the background thread to flush the data

            os._exit(0)

    for i in range(n_workers):
        res = q.get()
        print(f"parent got {res}")

    print("parent exiting")


if __name__ == "__main__":
    main()