Python multiprocessing.Queue 没有收到来自分叉进程的 put
Python multiprocessing.Queue not receiving puts from forked processes
我正在创建固定数量的分叉子进程,并试图通过 multiprocessing.Queue
使它们成为 return 结果。这会导致一些意外行为。
import multiprocessing
import os
def main():
n_workers = 4
q = multiprocessing.Queue(n_workers)
for i in range(n_workers):
if os.fork() == 0:
print(f"child {i} put {i}")
q.put(i)
print(f"child {i} exiting")
os._exit(0)
for i in range(n_workers):
res = q.get()
print(f"parent got {res}")
print("parent exiting")
if __name__ == "__main__":
main()
当我 运行 执行此操作时,所有子进程都将其结果排入队列并终止,但父进程挂起:
child 0 put 0 │
child 1 put 1 │
child 2 put 2 │
child 3 put 3 │
child 0 exiting │
child 1 exiting │
child 2 exiting │
child 3 exiting │
parent got 0
问题是 os._exit(0)
将数据放入队列后立即调用。
multiprocessing docs解释了数据是如何添加到队列中的:
When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.
因为进程是分叉的,所以需要调用 os._exit(0)
(与 sys.exit(0)
相对),但它 does not do any cleanup。如果后台线程还没有刷新数据,就会丢失!
解决方法是调用 close()
然后调用 join_thread()
:
import multiprocessing
import os
def main():
n_workers = 4
q = multiprocessing.Queue(n_workers)
for i in range(n_workers):
if os.fork() == 0:
print(f"child {i} put {i}")
q.put(i)
print(f"child {i} exiting")
q.close() # indicate nothing else will be queued by this process
q.join_thread() # wait for the background thread to flush the data
os._exit(0)
for i in range(n_workers):
res = q.get()
print(f"parent got {res}")
print("parent exiting")
if __name__ == "__main__":
main()
我正在创建固定数量的分叉子进程,并试图通过 multiprocessing.Queue
使它们成为 return 结果。这会导致一些意外行为。
import multiprocessing
import os
def main():
n_workers = 4
q = multiprocessing.Queue(n_workers)
for i in range(n_workers):
if os.fork() == 0:
print(f"child {i} put {i}")
q.put(i)
print(f"child {i} exiting")
os._exit(0)
for i in range(n_workers):
res = q.get()
print(f"parent got {res}")
print("parent exiting")
if __name__ == "__main__":
main()
当我 运行 执行此操作时,所有子进程都将其结果排入队列并终止,但父进程挂起:
child 0 put 0 │
child 1 put 1 │
child 2 put 2 │
child 3 put 3 │
child 0 exiting │
child 1 exiting │
child 2 exiting │
child 3 exiting │
parent got 0
问题是 os._exit(0)
将数据放入队列后立即调用。
multiprocessing docs解释了数据是如何添加到队列中的:
When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.
因为进程是分叉的,所以需要调用 os._exit(0)
(与 sys.exit(0)
相对),但它 does not do any cleanup。如果后台线程还没有刷新数据,就会丢失!
解决方法是调用 close()
然后调用 join_thread()
:
import multiprocessing
import os
def main():
n_workers = 4
q = multiprocessing.Queue(n_workers)
for i in range(n_workers):
if os.fork() == 0:
print(f"child {i} put {i}")
q.put(i)
print(f"child {i} exiting")
q.close() # indicate nothing else will be queued by this process
q.join_thread() # wait for the background thread to flush the data
os._exit(0)
for i in range(n_workers):
res = q.get()
print(f"parent got {res}")
print("parent exiting")
if __name__ == "__main__":
main()