通过具有特定信号的 asyncio 终止外部程序 运行

Terminate external program run through asyncio with specific signal

我需要从具有特定信号的 asyncio Python 脚本中 运行 终止外部程序,例如 SIGTERM。我的问题是程序总是收到 SIGINT,即使我向它们发送 SIGTERM 信号也是如此。

这是一个测试用例,可以找到下面测试中使用的fakeprg的源代码here

import asyncio
import traceback
import os
import os.path
import sys
import time
import signal
import shlex

from functools import partial


class ExtProgramRunner:
    run = True
    processes = []

    def __init__(self):
        pass

    def start(self, loop):
        self.current_loop = loop
        self.current_loop.add_signal_handler(signal.SIGINT, lambda: asyncio.async(self.stop('SIGINT')))
        self.current_loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.async(self.stop('SIGTERM')))
        asyncio.async(self.cancel_monitor())
        asyncio.Task(self.run_external_programs())

    @asyncio.coroutine
    def stop(self, sig):
        print("Got {} signal".format(sig))
        self.run = False
        for process in self.processes:
            print("sending SIGTERM signal to the process with pid {}".format(process.pid))
            process.send_signal(signal.SIGTERM)
        print("Canceling all tasks")
        for task in asyncio.Task.all_tasks():
            task.cancel()

    @asyncio.coroutine
    def cancel_monitor(self):
        while True:
            try:
                yield from asyncio.sleep(0.05)
            except asyncio.CancelledError:
                break
        print("Stopping loop")
        self.current_loop.stop()

    @asyncio.coroutine
    def run_external_programs(self):
        os.makedirs("/tmp/files0", exist_ok=True)
        os.makedirs("/tmp/files1", exist_ok=True)
        # schedule tasks for execution
        asyncio.Task(self.run_cmd_forever("/tmp/fakeprg /tmp/files0 1000"))
        asyncio.Task(self.run_cmd_forever("/tmp/fakeprg /tmp/files1 5000"))

    @asyncio.coroutine
    def run_cmd_forever(self, cmd):
        args = shlex.split(cmd)
        while self.run:
            process = yield from asyncio.create_subprocess_exec(*args)
            self.processes.append(process)
            exit_code = yield from process.wait()
            for idx, p in enumerate(self.processes):
                if process.pid == p.pid:
                    self.processes.pop(idx)
            print("External program '{}' exited with exit code {}, relauching".format(cmd, exit_code))


def main():
    loop = asyncio.get_event_loop()

    try:
        daemon = ExtProgramRunner()
        loop.call_soon(daemon.start, loop)

        # start main event loop
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except asyncio.CancelledError as exc:
        print("asyncio.CancelledError")
    except Exception as exc:
        print(exc, file=sys.stderr)
        print("====", file=sys.stderr)
        print(traceback.format_exc(), file=sys.stderr)
    finally:
        print("Stopping daemon...")
        loop.close()


if __name__ == '__main__':
    main()

原因是:当你启动你的 python 程序(父)并启动它的进程 /tmp/fakeprg(子)时,它们会得到所有不同的进程及其 pid,但它们都 run in the same foreground process group。您的 shell 已绑定到该组,因此当您点击 Ctrl-C (SIGINT)、Ctrl-Y (SIGTSTP) 或 Ctrl-\ (SIGQUIT) 它们被发送到前台进程组中的所有进程

在你的代码中,这发生在父进程甚至可以通过 send_signal 向其子进程发送信号之前,所以这一行向已经死掉的进程发送信号(并且应该失败,所以 IMO 这是一个问题异步)。

为了解决这个问题,您可以明确地将您的子进程放入一个单独的进程组中,如下所示:

asyncio.create_subprocess_exec(*args, preexec_fn=os.setpgrp)