运行 asyncio.subprocess.Process 来自 Tornado RequestHandler

Running asyncio.subprocess.Process from Tornado RequestHandler

我正在尝试编写一个异步运行本地命令的 Tornado Web 应用程序,作为协程。这是精简的示例代码:

#! /usr/bin/env python3

import shlex
import asyncio
import logging

from tornado.web import Application, url, RequestHandler
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop

logging.getLogger('asyncio').setLevel(logging.DEBUG)


async def run():
    command = "python3 /path/to/my/script.py"
    logging.debug('Calling command: {}'.format(command))
    process = asyncio.create_subprocess_exec(
        *shlex.split(command),
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.STDOUT
    )
    logging.debug('  - process created')

    result = await process
    stdout, stderr = result.communicate()
    output = stdout.decode()
    return output

def run_sync(self, path):
    command = "python3 /path/to/my/script.py"
    logging.debug('Calling command: {}'.format(command))
    try:
        result = subprocess.run(
            *shlex.split(command),
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            check=True
        )
    except subprocess.CalledProcessError as ex:
        raise RunnerError(ex.output)
    else:
        return result.stdout


class TestRunner(RequestHandler):

    async def get(self):
        result = await run()
        self.write(result)

url_list = [
    url(r"/test", TestRunner),
]
HTTPServer(Application(url_list, debug=True)).listen(8080)
logging.debug("Tornado server started at port {}.".format(8080))
IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
IOLoop.instance().start()

当直接调用 /path/to/my/script.py 时,它会按预期执行。此外,当我将 TestHandler.get 实现为常规同步方法时(请参阅 run_sync),它会正确执行。但是,当 运行 上述应用程序并调用 /test 时,日志显示:

DEBUG:asyncio:Using selector: EpollSelector
DEBUG:asyncio:execute program 'python3' stdout=stderr=<pipe>
DEBUG:asyncio:process 'python3' created: pid 21835

然而,ps显示进程挂起:

$ ps -ef | grep 21835
berislav 21835 21834  0 19:19 pts/2    00:00:00 [python3] <defunct>

我感觉我没有实现正确的循环,或者我做错了,但是我看到的所有 examples 都展示了如何使用 asyncio.get_event_loop().run_until_complete(your_coro()),并且我找不到太多关于组合 asyncio 和 Tornado 的信息。欢迎所有建议!

由于单例 SIGCHLD 处理程序,子流程很棘手。在 asyncio 中,这意味着它们仅适用于 "main" 事件循环。如果将 tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop') 更改为 tornado.platform.asyncio.AsyncIOMainLoop().install(),则该示例有效。还需要进行一些其他清理工作;这是完整的代码:

#! /usr/bin/env python3

import shlex
import asyncio
import logging

import tornado.platform.asyncio
from tornado.web import Application, url, RequestHandler
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop

logging.getLogger('asyncio').setLevel(logging.DEBUG)

async def run():
    command = "python3 /path/to/my/script.py"
    logging.debug('Calling command: {}'.format(command))
    process = await asyncio.create_subprocess_exec(
        *shlex.split(command),
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.STDOUT
    )
    logging.debug('  - process created')

    result = await process.wait()
    stdout, stderr = await process.communicate()
    output = stdout.decode()
    return output

tornado.platform.asyncio.AsyncIOMainLoop().install()
IOLoop.instance().run_sync(run)

另请注意,tornado 在 tornado.process.Subprocess 中有自己的子进程接口,因此如果这是您唯一需要 asyncio 的东西,请考虑改用 Tornado 版本。请注意,在同一进程中组合 Tornado 和 asyncio 的子进程接口可能会与 SIGCHLD 处理程序产生冲突,因此您应该选择其中之一,或者以不需要 SIGCHLD 处理程序的方式使用库(对于例如仅依赖 stdout/stderr 而不是进程的退出状态)。