multiprocessing.Queue 的破管错误

Broken pipe error with multiprocessing.Queue

在 python2.7 中,multiprocessing.Queue 在函数内部初始化时抛出错误。我提供了一个重现问题的最小示例。

#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing

def main():
    q = multiprocessing.Queue()
    for i in range(10):
        q.put(i)

if __name__ == "__main__":
    main()

抛出下面的破管错误

Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
IOError: [Errno 32] Broken pipe

Process finished with exit code 0

我无法理解为什么。我们不能从函数内部填充 Queue 对象肯定会很奇怪。

编辑:请使用 答案,这样更好。使用 join_thread 确保队列以比我建议的 time.sleep(0.1) 更好的方式完成工作。

这里发生的事情是,当你调用 main() 时,它会创建 Queue,将 10 个对象放入其中并结束函数,垃圾收集其所有内部变量和对象,包括Queue。 但是您收到此错误是因为您仍在尝试发送 Queue.

中的最后一个数字

来自文档 documentation :

"When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe."

由于 put() 是在另一个线程中创建的,因此不会阻塞脚本的执行,并允许在完成队列操作之前结束 main() 函数。

试试这个:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing
import time
def main():
    q = multiprocessing.Queue()
    for i in range(10):
        print i
        q.put(i)
    time.sleep(0.1) # Just enough to let the Queue finish

if __name__ == "__main__":
    main()

应该有一种方法可以 join 排队或阻止执行,直到对象被放入 Queue,你应该看看文档。

按照@HarryPotFleur 的建议使用time.sleep(0.1) 进行延迟,此问题已解决。但是,我用 python3 测试了代码,在 python3 中根本没有发生管道损坏的问题。我认为这是作为错误报告的,后来得到了修复。

当您启动 Queue.put() 时,将启动隐式线程以将数据传送到队列。同时,主应用程序完成,数据没有终点站(队列对象被垃圾收集)。

我会试试这个:

from multiprocessing import Queue

def main():
    q = Queue()
    for i in range(10):
        print i
        q.put(i)
    q.close()
    q.join_thread()

if __name__ == "__main__":
    main()

join_thread() 确保缓冲区中的所有数据都已刷新。 close() 必须在 join_thread()

之前调用