Multiprocessing "OSError: [Errno 24] Too many open files": How to clean up jobs and queues?
Multiprocessing "OSError: [Errno 24] Too many open files": How to clean up jobs and queues?
我正在尽最大努力在使用完 Queue
后关闭并清理它们,以便从 Python 的多处理模块中的进程收集输出。这里有一些代码由于 "too many open files" 而在某个时候死掉了。我还能做些什么来清理完整的 jobs/queues 以便我可以随心所欲地做很多事情?
# The following [fails to] demonstrates how to clean up jobs and queues (the queues is key?) to avoid the OSError of too many files open.
def dummy(inv,que):
que.put(inv)
return(0)
from multiprocessing import Process, Queue, cpu_count
nTest=2800
queues=[None for ii in range(nTest)]
for ii in range(nTest):
queues[ii]=Queue()
job=Process(target=dummy, args=[ii,queues[ii]])
job.start()
print('Started job %d'%ii)
job.join()
print('Joined job %d'%ii)
job.terminate()
print('Terminated job %d'%ii)
queues[ii].close()
因为它是一个 OSError,所以我的代码中没有导致问题的特定行。报告如下所示:
...
Terminated job 1006
Started job 1007
Joined job 1007
Terminated job 1007
Started job 1008
Joined job 1008
Terminated job 1008
Started job 1009
Joined job 1009
Terminated job 1009
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-2-5f057cd2fe88> in <module>()
----> 1 breaktest()
... in breaktest()
/usr/lib64/python2.6/multiprocessing/__init__.pyc in Queue(maxsize)
/usr/lib64/python2.6/multiprocessing/queues.pyc in __init__(self, maxsize)
/usr/lib64/python2.6/multiprocessing/synchronize.pyc in __init__(self)
/usr/lib64/python2.6/multiprocessing/synchronize.pyc in __init__(self, kind, value, maxvalue)
OSError: [Errno 24] Too many open files
> /usr/lib64/python2.6/multiprocessing/synchronize.py(49)__init__()
您的脚本在 ~1000 个任务后卡住,因为这是单个进程的文件描述符的默认限制。
队列是用保存文件描述符的管道实现的。管道已通过 GC 正确删除。当您将它们存储在列表中时,它们不会被垃圾收集并且文件描述符会泄漏,直到您的进程不分配其中的 1024 个,然后它崩溃。
您是否需要将队列存储在列表中?
只需在问题陈述的代码中将 queues[ii].close()
替换为 queues[ii] = None
即可避免显示的错误
(感谢评论中的@Finch_Powers)。
但是,我遇到了更多相关问题(这将是一个单独的问题),并且我的实际问题的更通用解决方案(激发了我 post 中的玩具示例)要小心避免让任何循环变量直接引用队列或包含它们的任何对象。当我完成队列时,结合将列表元素设置为 None,可能与手动调用 gc.collect() 结合,导致当我'我完成了每一个。参见 python multiprocessing: some functions do not return when they are complete (queue material too big)
这有帮助的实际代码是 runFunctionsInParallel 函数
https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py
我正在尽最大努力在使用完 Queue
后关闭并清理它们,以便从 Python 的多处理模块中的进程收集输出。这里有一些代码由于 "too many open files" 而在某个时候死掉了。我还能做些什么来清理完整的 jobs/queues 以便我可以随心所欲地做很多事情?
# The following [fails to] demonstrates how to clean up jobs and queues (the queues is key?) to avoid the OSError of too many files open.
def dummy(inv,que):
que.put(inv)
return(0)
from multiprocessing import Process, Queue, cpu_count
nTest=2800
queues=[None for ii in range(nTest)]
for ii in range(nTest):
queues[ii]=Queue()
job=Process(target=dummy, args=[ii,queues[ii]])
job.start()
print('Started job %d'%ii)
job.join()
print('Joined job %d'%ii)
job.terminate()
print('Terminated job %d'%ii)
queues[ii].close()
因为它是一个 OSError,所以我的代码中没有导致问题的特定行。报告如下所示:
...
Terminated job 1006
Started job 1007
Joined job 1007
Terminated job 1007
Started job 1008
Joined job 1008
Terminated job 1008
Started job 1009
Joined job 1009
Terminated job 1009
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-2-5f057cd2fe88> in <module>()
----> 1 breaktest()
... in breaktest()
/usr/lib64/python2.6/multiprocessing/__init__.pyc in Queue(maxsize)
/usr/lib64/python2.6/multiprocessing/queues.pyc in __init__(self, maxsize)
/usr/lib64/python2.6/multiprocessing/synchronize.pyc in __init__(self)
/usr/lib64/python2.6/multiprocessing/synchronize.pyc in __init__(self, kind, value, maxvalue)
OSError: [Errno 24] Too many open files
> /usr/lib64/python2.6/multiprocessing/synchronize.py(49)__init__()
您的脚本在 ~1000 个任务后卡住,因为这是单个进程的文件描述符的默认限制。
队列是用保存文件描述符的管道实现的。管道已通过 GC 正确删除。当您将它们存储在列表中时,它们不会被垃圾收集并且文件描述符会泄漏,直到您的进程不分配其中的 1024 个,然后它崩溃。
您是否需要将队列存储在列表中?
只需在问题陈述的代码中将 queues[ii].close()
替换为 queues[ii] = None
即可避免显示的错误
(感谢评论中的@Finch_Powers)。
但是,我遇到了更多相关问题(这将是一个单独的问题),并且我的实际问题的更通用解决方案(激发了我 post 中的玩具示例)要小心避免让任何循环变量直接引用队列或包含它们的任何对象。当我完成队列时,结合将列表元素设置为 None,可能与手动调用 gc.collect() 结合,导致当我'我完成了每一个。参见 python multiprocessing: some functions do not return when they are complete (queue material too big)
这有帮助的实际代码是 runFunctionsInParallel 函数 https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py