OSError: [Errno 24] Too many open files when using reactor.run() in Twisted

OSError: [Errno 24] Too many open files when using reactor.run() in Twisted

我遇到了一个奇怪的问题:我正在 运行 执行大量 utils.getProcessOutputAndValue('cmd', [args]) 命令,结果取决于我是使用 task.react() 还是 [=18] 启动反应堆=]

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from progress.bar import IncrementalBar
from twisted.internet import defer
from twisted.internet import task
from twisted.internet import utils
from twisted.python import usage


class Options(usage.Options):
    optFlags = [['reactor', 'r', 'Use reactor.run().'],
                ['task', 't', 'Use task.react().'],
                ['cwr', 'w', 'Use callWhenRunning().']]
    optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'],
                     ['cmd', 'c', 'echo Testing {i}...', 'Command to run.']]


def run(opt):
    limit = int(opt['limit'])
    cmd, args = opt['cmd'].split(' ', 1)
    bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit)
    requests = []
    for i in range(0, limit):
        try:
            _args = args.format(i=i)
            args = _args
        except KeyError:
            pass
        requests.append(utils.getProcessOutputAndValue('echo', [args]))
        bar.next()
    bar.finish()
    return defer.gatherResults(requests)


@defer.inlineCallbacks
def main(reactor, opt):
    d = defer.Deferred()
    limit = int(opt['limit'])
    cmd, args = opt['cmd'].split(' ', 1)
    bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit)
    for i in range(0, limit):
        try:
            _args = args.format(i=i)
            args = _args
        except KeyError:
            pass
        yield utils.getProcessOutputAndValue('echo', [args])
        bar.next()
    bar.finish()
    defer.returnValue(d.callback(True))


if __name__ == '__main__':
    opt = Options()
    opt.parseOptions()

    if opt['reactor']:
        from twisted.internet import reactor
        task.deferLater(reactor, 0, run, opt)
        reactor.run()

    elif opt['task']:
        from twisted.internet.task import react
        react(main, [opt])

    elif opt['cwr']:
        from twisted.internet import reactor
        reactor.callWhenRunning(run, opt)
        reactor.run()

在 400 以上使用 limit 时(在我的例子中)出现以下错误:

Upon execvpe echo ['echo', 'Testing 0...'] in environment id 42131264
:Traceback (most recent call last):
  File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 428, in _fork
    self._setupChild(**kwargs)
  File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 803, in _setupChild
    for fd in _listOpenFDs():
  File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 638, in _listOpenFDs
    return detector._listOpenFDs()
  File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 553, in _listOpenFDs
    self._listOpenFDs = self._getImplementation()
  File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 576, in _getImplementation
    after = impl()
  File "/home/vagrant/.env/sm/lib/python2.7/site-packages/Twisted-15.5.0-py2.7-linux-x86_64.egg/twisted/internet/process.py", line 606, in _procFDImplementation
    return [int(fd) for fd in self.listdir(dname)]
OSError: [Errno 24] Too many open files: '/proc/23421/fd'
Unhandled error in Deferred:

如果我使用task.react()

不会发生

简历中:

问题是我有一个使用 reactor 的大应用程序,因为它是一个响应 SMTP 连接的应用程序(所以不能使用 task.react,我不想停止 reactor)。

我一直认为 task.react 只是在延迟完成后才停止反应器,但我想做的不止这些...


edit:这里是 pstree 比较 task.reactreactor.run

reactor.run (python pyerr.py -l400 -r):

init-+-VBoxService---7*[{VBoxService}]
     |-acpid
     |-atd
     |-cron
     |-dbus-daemon
     |-dhclient
     |-6*[getty]
     |-master-+-pickup
     |        `-qmgr
     |-mysqld---18*[{mysqld}]
     |-nginx---4*[nginx]
     |-php5-fpm---2*[php5-fpm]
     |-puppet---{puppet}
     |-rpc.idmapd
     |-rpc.statd
     |-rpcbind
     |-rsyslogd---3*[{rsyslogd}]
     |-ruby---{ruby}
     |-sshd-+-3*[sshd---sshd---sftp-server]
     |      |-sshd---sshd---2*[sftp-server]
     |      |-sshd---sshd---bash---pstree
     |      `-sshd---sshd---bash---python-+-323*[echo]
     |                                    `-5*[python]
     |-systemd-logind
     |-systemd-udevd
     |-upstart-file-br
     |-upstart-socket-
     `-upstart-udev-br

task.react (python pyerr.py -l400 -t):

init-+-VBoxService---7*[{VBoxService}]
     |-acpid
     |-atd
     |-cron
     |-dbus-daemon
     |-dhclient
     |-6*[getty]
     |-master-+-pickup
     |        `-qmgr
     |-mysqld---18*[{mysqld}]
     |-nginx---4*[nginx]
     |-php5-fpm---2*[php5-fpm]
     |-puppet---{puppet}
     |-rpc.idmapd
     |-rpc.statd
     |-rpcbind
     |-rsyslogd---3*[{rsyslogd}]
     |-ruby---{ruby}
     |-sshd-+-3*[sshd---sshd---sftp-server]
     |      |-sshd---sshd---2*[sftp-server]
     |      |-sshd---sshd---bash---pstree
     |      `-sshd---sshd---bash---python---echo
     |-systemd-logind
     |-systemd-udevd
     |-upstart-file-br
     |-upstart-socket-
     `-upstart-udev-br

注意这两者的区别

 |      `-sshd---sshd---bash---python-+-323*[echo]
 |                                    `-5*[python]

还有这个

 |      `-sshd---sshd---bash---python---echo

在一个 cas 中,进程似乎不会在完成后立即关闭。

我已经在 4 台不同的机器上测试过这个问题:

问题完全一样。

要试一试,请尝试 运行 watch -n 0.1 "pstree" 以了解流程如何演变。


编辑:由于 Glyph 的回答,我明白了为什么会发生这种情况,但是如何使它适应我的现实生活案例?

我用 Twisted 开发的应用程序是一个基于 Milter 的 SMTP 过滤器,这里是它的工作原理(假设我们要检查电子邮件签名):

在这种情况下,我的问题是我有 150 个同时连接,将有 150 次调用模块(TCP 协议),并且此模块将在每个连接中调用一次 openssl 命令。

该模块是完全不可知的,因此不知道其他调用是否正在 运行ning。在你看来我应该把 DeferredSemaphore 放在哪里?

我的问题是 smtp 连接也是不可知的,不知道其他可能打开的连接。

您认为处理这种并行性的正确方法是什么?

这里的问题与 task.reactreactor.run 之间的区别无关,而是 run 和 [=14] 的实现之间细微但重要的区别=] 函数。

不同之处在于 run 正在生成 limit 个进程 并行 ,累积了数千个同时打开的文件描述符,很容易突破系统的限制.但是,main 甚至在启动下一个进程之前等待每个进程完全执行完毕,这意味着它一次使用的进程永远不会超过 4 或 5 个。

原因是 maininlineCallbacks 装饰并且产生每个 getProcessOutputAndValue Deferred,它暂停执行 main 直到 Deferred 已完成。

在实际应用中,这两种方式都不理想。你想要一些并行性,但不是无限的。 Twisted 附带了一些实用程序,例如 DeferredSemaphore, to facilitate limited parallelism without restricting everything to only run one task at a time. Jean-Paul Calderone wrote an article - 10 years ago! - that explains how to use this, here.

但是,为了证明该问题与 task.react 无关,这里是您示例的修改版本,它删除了 run 函数并使用main:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from progress.bar import IncrementalBar
from twisted.internet import defer
from twisted.internet import task
from twisted.internet import utils
from twisted.python import usage


class Options(usage.Options):
    optFlags = [['reactor', 'r', 'Use reactor.run().'],
                ['task', 't', 'Use task.react().'],
                ['cwr', 'w', 'Use callWhenRunning().']]
    optParameters = [['limit', 'l', 255, 'Number of file descriptors to open.'],
                     ['cmd', 'c', 'echo Testing {i}...', 'Command to run.']]


@defer.inlineCallbacks
def main(reactor, opt):
    d = defer.Deferred()
    limit = int(opt['limit'])
    cmd, args = opt['cmd'].split(' ', 1)
    bar = IncrementalBar('Running {cmd}'.format(cmd=opt['cmd']), max=limit)
    for i in range(0, limit):
        try:
            _args = args.format(i=i)
            args = _args
        except KeyError:
            pass
        yield utils.getProcessOutputAndValue('echo', [args])
        bar.next()
    bar.finish()
    defer.returnValue(d.callback(True))


if __name__ == '__main__':
    opt = Options()
    opt.parseOptions()

    if opt['reactor']:
        from twisted.internet import reactor
        task.deferLater(reactor, 0, main, reactor, opt)
        reactor.run()

    elif opt['task']:
        from twisted.internet.task import react
        react(main, [opt])

    elif opt['cwr']:
        from twisted.internet import reactor
        reactor.callWhenRunning(main, reactor, opt)
        reactor.run()

编辑,回复问题中的编辑:

由于您的 真正的 问题是传入连接,而不仅仅是 for 循环,而不是使用 DeferredSemaphore,您可能需要维护一个计数器,并利用对象从 listenTCP, or the result of the Deferred that comes back from TCP4ServerEndpoint, implements IPushProducer 返回的事实,并在太多并发连接正在工作时调用 pauseProducing(),并在工作完成时调用 resumeProducing()