当 asyncio 任务正在执行_blocking_ 工作时,如何在键盘中断时正常关闭?

How to shutdown gracefully on keyboard interrupt when an asyncio task is performing _blocking_ work?

更新:asyncio 只是按照它的指示进行操作,您可以很好地处理这些异常 - 请参阅我已标记为该问题的解决方案的后续答案。下面是原始问题,稍微修改了示例以阐明问题及其解决方案。

我一直在尝试调试我正在处理的严重依赖 asyncio 的库。在处理一些示例代码时,我意识到执行键盘中断 (CTRL-C) 有时(很少!)会触发可怕的...

Task exception was never retrieved

我努力确保我分拆的所有任务都能正常处理 asyncio.CancelledError,在花了太多时间调试之后,我意识到我只 如果其中一个 asyncio 任务卡在阻塞操作上,则会出现此错误消息。

封锁?你真的不应该在任务中执行阻塞工作——这就是为什么 asyncio 很友善地警告你这一点。 运行 下面的代码...

import asyncio
from time import sleep


async def possibly_dangerous_sleep(i: int, use_blocking_sleep: bool = True):

    try:
        print(f"Sleep #{i}: Fine to cancel me within the next 2 seconds")
        await asyncio.sleep(2)
        if use_blocking_sleep:
            print(
                f"Sleep #{i}: Not fine to cancel me within the next 10 seconds UNLESS someone is"
                " awaiting me, e.g. asyncio.gather()"
            )
            sleep(10)
        else:
            print(f"Sleep #{i}: Will sleep using asyncio.sleep(), nothing to see here")
            await asyncio.sleep(10)
        print(f"Sleep #{i}: Fine to cancel me now")
        await asyncio.sleep(2)
    except asyncio.CancelledError:
        print(f"Sleep #{i}: So, I got cancelled...")
        raise


def done_cb(task: asyncio.Task):
    name = task.get_name()
    try:
        task.exception()
    except asyncio.CancelledError:
        print(f"Done: Task {name} was cancelled")
        pass
    except Exception as e:
        print(f"Done: Task {name} didn't handle exception { e }")
    else:
        print(f"Done: Task {name} is simply done")


async def start_doing_stuff(collect_exceptions_when_gathering: bool = False):

    tasks = []
    for i in range(1, 7):
        task = asyncio.create_task(
            possibly_dangerous_sleep(i, use_blocking_sleep=True), name=str(i)
        )
        task.add_done_callback(done_cb)
        tasks.append(task)

    # await asyncio.sleep(3600)

    results = await asyncio.gather(*tasks, return_exceptions=collect_exceptions_when_gathering)


if __name__ == "__main__":
    try:
        asyncio.run(start_doing_stuff(collect_exceptions_when_gathering=False), debug=True)
    except KeyboardInterrupt:
        print("User aborted through keyboard")

...调试控制台会告诉您一些信息:

Executing <Task finished name='Task-2' coro=<possibly_dangerous_sleep() done, defined at ~/src/hej.py:5> result=None created at ~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/tasks.py:337> took 10.005 seconds

请放心,上面对 sleep(10) 的调用并不是我正在处理的库中的罪魁祸首,但它说明了我 运行 遇到的问题:如果我尝试在前 2 到 12 秒内中断上述测试应用程序 运行,调试控制台将以大量源代码回溯结束:

Fine to cancel me within the next 2 seconds
Not fine to cancel me within the next 10 seconds UNLESS someone is awaiting me, e.g. asyncio.gather()
^CDone with: <Task finished name='Task-2' coro=<possibly_dangerous_sleep() done, defined at ~/src/hej.py:5> exception=KeyboardInterrupt() created at ~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/tasks.py:337>
User aborted through keyboard
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<dangerous_sleep() done, defined at ~/src/hej.py:5> exception=KeyboardInterrupt() created at ~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/tasks.py:337>
source_traceback: Object created at (most recent call last):
  File "~/.pyenv/versions/3.10.0/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "~/.pyenv/versions/3.10.0/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "~/.vscode/extensions/ms-python.python-2021.12.1559732655/pythonFiles/lib/python/debugpy/__main__.py", line 45, in <module>
    cli.main()
  File "~/.vscode/extensions/ms-python.python-2021.12.1559732655/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 444, in main
    run()
  File "~/.vscode/extensions/ms-python.python-2021.12.1559732655/pythonFiles/lib/python/debugpy/../debugpy/server/cli.py", line 285, in run_file
    runpy.run_path(target_as_str, run_name=compat.force_str("__main__"))
  File "~/.pyenv/versions/3.10.0/lib/python3.10/runpy.py", line 269, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "~/.pyenv/versions/3.10.0/lib/python3.10/runpy.py", line 96, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "~/.pyenv/versions/3.10.0/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "~/src/hej.py", line 37, in <module>
    asyncio.run(start_doing_stuff(), debug=True)
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/base_events.py", line 628, in run_until_complete
    self.run_forever()
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/base_events.py", line 595, in run_forever
    self._run_once()
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/base_events.py", line 1873, in _run_once
    handle._run()
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "~/src/hej.py", line 28, in start_doing_stuff
    task = asyncio.create_task(dangerous_sleep())
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/tasks.py", line 337, in create_task
    task = loop.create_task(coro)
Traceback (most recent call last):
  File "~/src/hej.py", line 37, in <module>
    asyncio.run(start_doing_stuff(), debug=True)
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/base_events.py", line 628, in run_until_complete
    self.run_forever()
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/base_events.py", line 595, in run_forever
    self._run_once()
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/base_events.py", line 1873, in _run_once
    handle._run()
  File "~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "~/src/hej.py", line 14, in dangerous_sleep
    sleep(10)
KeyboardInterrupt

如果我将 await asyncio.sleep(3600) 替换为 await asyncio.gather(task)(参见示例代码)并调用 CTRL-C,我会在我的调试控制台中得到一个非常整洁的关闭序列:

Fine to cancel me within the next 2 seconds
Not fine to cancel me within the next 10 seconds UNLESS someone is awaiting me, e.g. asyncio.gather()
^CDone with: <Task finished name='Task-2' coro=<possibly_dangerous_sleep() done, defined at ~/src/hej.py:5> exception=KeyboardInterrupt() created at ~/.pyenv/versions/3.10.0/lib/python3.10/asyncio/tasks.py:337>
User aborted through keyboard

如果这是设计使然,有人可以向我解释一下吗?当 asyncio.run() 被中断时(同时自行清理),我期待 all asyncio 任务为我取消。

总结:你需要处理你的异常,否则 asyncio 会报错。

对于后台任务(即您没有明确等待使用 gather() 的任务)

您可能认为尝试在您的任务中使用 except asyncio.CancelledError(并重新提出)来捕获取消将处理所有类型的取消。事实并非如此。如果您的任务在被取消时正在执行阻塞工作,您将无法在任务本身内捕获异常(例如 KeyboardInterrupt)。安全的做法是在 asyncio.Task 上使用 add_done_callback 注册完成回调。在此回调中,检查是否存在异常(请参阅问题中更新的示例代码)。如果您的任务在被取消时卡在阻止工作中,完成回调将告诉您任务已完成(相对于已取消)。

对于您等待的一堆任务,使用 gather()

如果使用收集,则不需要添加完成回调。相反,要求它 return 任何异常,它会处理 KeyboardInterrupt 就好了。如果您不这样做,在其任何可等待对象中引发的第一个异常将立即传播到等待 gather() 的任务。如果任务中的 KeyboardInterrupt 卡在阻塞工作中,KeyboardInterrupt 将被重新引发,您需要处理它。或者,使用 try/except 来处理引发的任何异常。请通过在示例代码中设置 collect_exceptions_when_gathering 变量来自己尝试。

最后:只有我现在不明白的是,如果一个任务调用gather(),我没有看到任何异常被引发,不要求它 return 异常。尝试修改示例代码,使其范围为 range(1,2),这样您就不会在 CTRL-C 上看到混乱的堆栈跟踪...?