OSError: [Errno 24] Too many open files when using reactor.run() in Twisted
OSError: [Errno 24] Too many open files when using reactor.run() in Twisted
我遇到了一个奇怪的问题:我正在 运行 执行大量 utils.getProcessOutputAndValue('cmd', [args])
命令,结果取决于我是使用 task.react()
还是 [=18] 启动反应堆=]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from progress.bar import IncrementalBar
from twisted.internet import defer
from twisted.internet import task
from twisted.internet import utils
from twisted.python import usage
class Options(usage.Options):
optFlags = [['reactor', 'r', 'Use reactor.run().'],
['task', 't', 'Use task.react().'],
['cwr', 'w', 'Use callWhenRunning().']]
optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'],
['cmd', 'c', 'echo Testing {i}...', 'Command to run.']]
def run(opt):
limit = int(opt['limit'])
cmd, args = opt['cmd'].split(' ', 1)
bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit)
requests = []
for i in range(0, limit):
try:
_args = args.format(i=i)
args = _args
except KeyError:
pass
requests.append(utils.getProcessOutputAndValue('echo', [args]))
bar.next()
bar.finish()
return defer.gatherResults(requests)
@defer.inlineCallbacks
def main(reactor, opt):
d = defer.Deferred()
limit = int(opt['limit'])
cmd, args = opt['cmd'].split(' ', 1)
bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit)
for i in range(0, limit):
try:
_args = args.format(i=i)
args = _args
except KeyError:
pass
yield utils.getProcessOutputAndValue('echo', [args])
bar.next()
bar.finish()
defer.returnValue(d.callback(True))
if __name__ == '__main__':
opt = Options()
opt.parseOptions()
if opt['reactor']:
from twisted.internet import reactor
task.deferLater(reactor, 0, run, opt)
reactor.run()
elif opt['task']:
from twisted.internet.task import react
react(main, [opt])
elif opt['cwr']:
from twisted.internet import reactor
reactor.callWhenRunning(run, opt)
reactor.run()
在 400 以上使用 limit
时(在我的例子中)出现以下错误:
Upon execvpe echo ['echo', 'Testing 0...'] in environment id 42131264
:Traceback (most recent call last):
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 428, in _fork
self._setupChild(**kwargs)
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 803, in _setupChild
for fd in _listOpenFDs():
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 638, in _listOpenFDs
return detector._listOpenFDs()
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 553, in _listOpenFDs
self._listOpenFDs = self._getImplementation()
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 576, in _getImplementation
after = impl()
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 606, in _procFDImplementation
return [int(fd) for fd in self.listdir(dname)]
OSError: [Errno 24] Too many open files: '/proc/23421/fd'
Unhandled error in Deferred:
如果我使用task.react()
,不会发生
简历中:
python pyerr.py -l100 -r
: 好的
python pyerr.py -l100 -t
: 好的
python pyerr.py -l100 -w
: 确定
python pyerr.py -l400 -r
: OSERR
python pyerr.py -l400 -t
: 确定
python pyerr.py -l400 -w
: OSERR
问题是我有一个使用 reactor 的大应用程序,因为它是一个响应 SMTP 连接的应用程序(所以不能使用 task.react
,我不想停止 reactor)。
我一直认为 task.react
只是在延迟完成后才停止反应器,但我想做的不止这些...
edit:这里是 pstree
比较 task.react
与 reactor.run
reactor.run (python pyerr.py -l400 -r):
init-+-VBoxService---7*[{VBoxService}]
|-acpid
|-atd
|-cron
|-dbus-daemon
|-dhclient
|-6*[getty]
|-master-+-pickup
| `-qmgr
|-mysqld---18*[{mysqld}]
|-nginx---4*[nginx]
|-php5-fpm---2*[php5-fpm]
|-puppet---{puppet}
|-rpc.idmapd
|-rpc.statd
|-rpcbind
|-rsyslogd---3*[{rsyslogd}]
|-ruby---{ruby}
|-sshd-+-3*[sshd---sshd---sftp-server]
| |-sshd---sshd---2*[sftp-server]
| |-sshd---sshd---bash---pstree
| `-sshd---sshd---bash---python-+-323*[echo]
| `-5*[python]
|-systemd-logind
|-systemd-udevd
|-upstart-file-br
|-upstart-socket-
`-upstart-udev-br
task.react (python pyerr.py -l400 -t):
init-+-VBoxService---7*[{VBoxService}]
|-acpid
|-atd
|-cron
|-dbus-daemon
|-dhclient
|-6*[getty]
|-master-+-pickup
| `-qmgr
|-mysqld---18*[{mysqld}]
|-nginx---4*[nginx]
|-php5-fpm---2*[php5-fpm]
|-puppet---{puppet}
|-rpc.idmapd
|-rpc.statd
|-rpcbind
|-rsyslogd---3*[{rsyslogd}]
|-ruby---{ruby}
|-sshd-+-3*[sshd---sshd---sftp-server]
| |-sshd---sshd---2*[sftp-server]
| |-sshd---sshd---bash---pstree
| `-sshd---sshd---bash---python---echo
|-systemd-logind
|-systemd-udevd
|-upstart-file-br
|-upstart-socket-
`-upstart-udev-br
注意这两者的区别
| `-sshd---sshd---bash---python-+-323*[echo]
| `-5*[python]
还有这个
| `-sshd---sshd---bash---python---echo
在一个 cas 中,进程似乎不会在完成后立即关闭。
我已经在 4 台不同的机器上测试过这个问题:
- Ubuntu 14.04
- 森托斯 6
- Centos 7
问题完全一样。
要试一试,请尝试 运行 watch -n 0.1 "pstree"
以了解流程如何演变。
编辑:由于 Glyph 的回答,我明白了为什么会发生这种情况,但是如何使它适应我的现实生活案例?
我用 Twisted 开发的应用程序是一个基于 Milter 的 SMTP 过滤器,这里是它的工作原理(假设我们要检查电子邮件签名):
- 连接在端口 25 上打开
- milter 协议获取所有电子邮件详细信息
- milter 调用远程 "module" 服务器,该服务器将使用
/usr/bin/openssl mime
调用 处理签名检查
- 模块将return一个表明签名是否有效的答案
在这种情况下,我的问题是我有 150 个同时连接,将有 150 次调用模块(TCP 协议),并且此模块将在每个连接中调用一次 openssl 命令。
该模块是完全不可知的,因此不知道其他调用是否正在 运行ning。在你看来我应该把 DeferredSemaphore
放在哪里?
我的问题是 smtp 连接也是不可知的,不知道其他可能打开的连接。
您认为处理这种并行性的正确方法是什么?
这里的问题与 task.react
和 reactor.run
之间的区别无关,而是 run
和 [=14] 的实现之间细微但重要的区别=] 函数。
不同之处在于 run
正在生成 limit
个进程 并行 ,累积了数千个同时打开的文件描述符,很容易突破系统的限制.但是,main
甚至在启动下一个进程之前等待每个进程完全执行完毕,这意味着它一次使用的进程永远不会超过 4 或 5 个。
原因是 main
被 inlineCallbacks
装饰并且产生每个 getProcessOutputAndValue
Deferred
,它暂停执行 main
直到 Deferred
已完成。
在实际应用中,这两种方式都不理想。你想要一些并行性,但不是无限的。 Twisted 附带了一些实用程序,例如 DeferredSemaphore
, to facilitate limited parallelism without restricting everything to only run one task at a time. Jean-Paul Calderone wrote an article - 10 years ago! - that explains how to use this, here.
但是,为了证明该问题与 task.react
无关,这里是您示例的修改版本,它删除了 run
函数并使用main
:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from progress.bar import IncrementalBar
from twisted.internet import defer
from twisted.internet import task
from twisted.internet import utils
from twisted.python import usage
class Options(usage.Options):
optFlags = [['reactor', 'r', 'Use reactor.run().'],
['task', 't', 'Use task.react().'],
['cwr', 'w', 'Use callWhenRunning().']]
optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'],
['cmd', 'c', 'echo Testing {i}...', 'Command to run.']]
@defer.inlineCallbacks
def main(reactor, opt):
d = defer.Deferred()
limit = int(opt['limit'])
cmd, args = opt['cmd'].split(' ', 1)
bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit)
for i in range(0, limit):
try:
_args = args.format(i=i)
args = _args
except KeyError:
pass
yield utils.getProcessOutputAndValue('echo', [args])
bar.next()
bar.finish()
defer.returnValue(d.callback(True))
if __name__ == '__main__':
opt = Options()
opt.parseOptions()
if opt['reactor']:
from twisted.internet import reactor
task.deferLater(reactor, 0, main, reactor, opt)
reactor.run()
elif opt['task']:
from twisted.internet.task import react
react(main, [opt])
elif opt['cwr']:
from twisted.internet import reactor
reactor.callWhenRunning(main, reactor, opt)
reactor.run()
编辑,回复问题中的编辑:
由于您的 真正的 问题是传入连接,而不仅仅是 for
循环,而不是使用 DeferredSemaphore
,您可能需要维护一个计数器,并利用对象从 listenTCP
, or the result of the Deferred
that comes back from TCP4ServerEndpoint
, implements IPushProducer
返回的事实,并在太多并发连接正在工作时调用 pauseProducing()
,并在工作完成时调用 resumeProducing()
。
我遇到了一个奇怪的问题:我正在 运行 执行大量 utils.getProcessOutputAndValue('cmd', [args])
命令,结果取决于我是使用 task.react()
还是 [=18] 启动反应堆=]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from progress.bar import IncrementalBar
from twisted.internet import defer
from twisted.internet import task
from twisted.internet import utils
from twisted.python import usage
class Options(usage.Options):
optFlags = [['reactor', 'r', 'Use reactor.run().'],
['task', 't', 'Use task.react().'],
['cwr', 'w', 'Use callWhenRunning().']]
optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'],
['cmd', 'c', 'echo Testing {i}...', 'Command to run.']]
def run(opt):
limit = int(opt['limit'])
cmd, args = opt['cmd'].split(' ', 1)
bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit)
requests = []
for i in range(0, limit):
try:
_args = args.format(i=i)
args = _args
except KeyError:
pass
requests.append(utils.getProcessOutputAndValue('echo', [args]))
bar.next()
bar.finish()
return defer.gatherResults(requests)
@defer.inlineCallbacks
def main(reactor, opt):
d = defer.Deferred()
limit = int(opt['limit'])
cmd, args = opt['cmd'].split(' ', 1)
bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit)
for i in range(0, limit):
try:
_args = args.format(i=i)
args = _args
except KeyError:
pass
yield utils.getProcessOutputAndValue('echo', [args])
bar.next()
bar.finish()
defer.returnValue(d.callback(True))
if __name__ == '__main__':
opt = Options()
opt.parseOptions()
if opt['reactor']:
from twisted.internet import reactor
task.deferLater(reactor, 0, run, opt)
reactor.run()
elif opt['task']:
from twisted.internet.task import react
react(main, [opt])
elif opt['cwr']:
from twisted.internet import reactor
reactor.callWhenRunning(run, opt)
reactor.run()
在 400 以上使用 limit
时(在我的例子中)出现以下错误:
Upon execvpe echo ['echo', 'Testing 0...'] in environment id 42131264
:Traceback (most recent call last):
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 428, in _fork
self._setupChild(**kwargs)
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 803, in _setupChild
for fd in _listOpenFDs():
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 638, in _listOpenFDs
return detector._listOpenFDs()
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 553, in _listOpenFDs
self._listOpenFDs = self._getImplementation()
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 576, in _getImplementation
after = impl()
File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 606, in _procFDImplementation
return [int(fd) for fd in self.listdir(dname)]
OSError: [Errno 24] Too many open files: '/proc/23421/fd'
Unhandled error in Deferred:
如果我使用task.react()
简历中:
python pyerr.py -l100 -r
: 好的python pyerr.py -l100 -t
: 好的python pyerr.py -l100 -w
: 确定python pyerr.py -l400 -r
: OSERRpython pyerr.py -l400 -t
: 确定python pyerr.py -l400 -w
: OSERR
问题是我有一个使用 reactor 的大应用程序,因为它是一个响应 SMTP 连接的应用程序(所以不能使用 task.react
,我不想停止 reactor)。
我一直认为 task.react
只是在延迟完成后才停止反应器,但我想做的不止这些...
edit:这里是 pstree
比较 task.react
与 reactor.run
reactor.run (python pyerr.py -l400 -r):
init-+-VBoxService---7*[{VBoxService}]
|-acpid
|-atd
|-cron
|-dbus-daemon
|-dhclient
|-6*[getty]
|-master-+-pickup
| `-qmgr
|-mysqld---18*[{mysqld}]
|-nginx---4*[nginx]
|-php5-fpm---2*[php5-fpm]
|-puppet---{puppet}
|-rpc.idmapd
|-rpc.statd
|-rpcbind
|-rsyslogd---3*[{rsyslogd}]
|-ruby---{ruby}
|-sshd-+-3*[sshd---sshd---sftp-server]
| |-sshd---sshd---2*[sftp-server]
| |-sshd---sshd---bash---pstree
| `-sshd---sshd---bash---python-+-323*[echo]
| `-5*[python]
|-systemd-logind
|-systemd-udevd
|-upstart-file-br
|-upstart-socket-
`-upstart-udev-br
task.react (python pyerr.py -l400 -t):
init-+-VBoxService---7*[{VBoxService}]
|-acpid
|-atd
|-cron
|-dbus-daemon
|-dhclient
|-6*[getty]
|-master-+-pickup
| `-qmgr
|-mysqld---18*[{mysqld}]
|-nginx---4*[nginx]
|-php5-fpm---2*[php5-fpm]
|-puppet---{puppet}
|-rpc.idmapd
|-rpc.statd
|-rpcbind
|-rsyslogd---3*[{rsyslogd}]
|-ruby---{ruby}
|-sshd-+-3*[sshd---sshd---sftp-server]
| |-sshd---sshd---2*[sftp-server]
| |-sshd---sshd---bash---pstree
| `-sshd---sshd---bash---python---echo
|-systemd-logind
|-systemd-udevd
|-upstart-file-br
|-upstart-socket-
`-upstart-udev-br
注意这两者的区别
| `-sshd---sshd---bash---python-+-323*[echo]
| `-5*[python]
还有这个
| `-sshd---sshd---bash---python---echo
在一个 cas 中,进程似乎不会在完成后立即关闭。
我已经在 4 台不同的机器上测试过这个问题:
- Ubuntu 14.04
- 森托斯 6
- Centos 7
问题完全一样。
要试一试,请尝试 运行 watch -n 0.1 "pstree"
以了解流程如何演变。
编辑:由于 Glyph 的回答,我明白了为什么会发生这种情况,但是如何使它适应我的现实生活案例?
我用 Twisted 开发的应用程序是一个基于 Milter 的 SMTP 过滤器,这里是它的工作原理(假设我们要检查电子邮件签名):
- 连接在端口 25 上打开
- milter 协议获取所有电子邮件详细信息
- milter 调用远程 "module" 服务器,该服务器将使用
/usr/bin/openssl mime
调用 处理签名检查
- 模块将return一个表明签名是否有效的答案
在这种情况下,我的问题是我有 150 个同时连接,将有 150 次调用模块(TCP 协议),并且此模块将在每个连接中调用一次 openssl 命令。
该模块是完全不可知的,因此不知道其他调用是否正在 运行ning。在你看来我应该把 DeferredSemaphore
放在哪里?
我的问题是 smtp 连接也是不可知的,不知道其他可能打开的连接。
您认为处理这种并行性的正确方法是什么?
这里的问题与 task.react
和 reactor.run
之间的区别无关,而是 run
和 [=14] 的实现之间细微但重要的区别=] 函数。
不同之处在于 run
正在生成 limit
个进程 并行 ,累积了数千个同时打开的文件描述符,很容易突破系统的限制.但是,main
甚至在启动下一个进程之前等待每个进程完全执行完毕,这意味着它一次使用的进程永远不会超过 4 或 5 个。
原因是 main
被 inlineCallbacks
装饰并且产生每个 getProcessOutputAndValue
Deferred
,它暂停执行 main
直到 Deferred
已完成。
在实际应用中,这两种方式都不理想。你想要一些并行性,但不是无限的。 Twisted 附带了一些实用程序,例如 DeferredSemaphore
, to facilitate limited parallelism without restricting everything to only run one task at a time. Jean-Paul Calderone wrote an article - 10 years ago! - that explains how to use this, here.
但是,为了证明该问题与 task.react
无关,这里是您示例的修改版本,它删除了 run
函数并使用main
:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from progress.bar import IncrementalBar
from twisted.internet import defer
from twisted.internet import task
from twisted.internet import utils
from twisted.python import usage
class Options(usage.Options):
optFlags = [['reactor', 'r', 'Use reactor.run().'],
['task', 't', 'Use task.react().'],
['cwr', 'w', 'Use callWhenRunning().']]
optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'],
['cmd', 'c', 'echo Testing {i}...', 'Command to run.']]
@defer.inlineCallbacks
def main(reactor, opt):
d = defer.Deferred()
limit = int(opt['limit'])
cmd, args = opt['cmd'].split(' ', 1)
bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit)
for i in range(0, limit):
try:
_args = args.format(i=i)
args = _args
except KeyError:
pass
yield utils.getProcessOutputAndValue('echo', [args])
bar.next()
bar.finish()
defer.returnValue(d.callback(True))
if __name__ == '__main__':
opt = Options()
opt.parseOptions()
if opt['reactor']:
from twisted.internet import reactor
task.deferLater(reactor, 0, main, reactor, opt)
reactor.run()
elif opt['task']:
from twisted.internet.task import react
react(main, [opt])
elif opt['cwr']:
from twisted.internet import reactor
reactor.callWhenRunning(main, reactor, opt)
reactor.run()
编辑,回复问题中的编辑:
由于您的 真正的 问题是传入连接,而不仅仅是 for
循环,而不是使用 DeferredSemaphore
,您可能需要维护一个计数器,并利用对象从 listenTCP
, or the result of the Deferred
that comes back from TCP4ServerEndpoint
, implements IPushProducer
返回的事实,并在太多并发连接正在工作时调用 pauseProducing()
,并在工作完成时调用 resumeProducing()
。