由于超时,如何从已取消的 python asyncio 协程中 return 一个值

How to return a value from a cancelled python asyncio coroutine due to timeout

在 python > 3.5 中,如何让协程 return 在因 TimeoutError 被取消后获得最终值?

我有一个小型 python 项目,它使用多个协程来传输 运行 数据并报告传输的数据量 t运行。它需要一个超时参数;如果脚本在完成 t运行sfer 之前超时,它会报告它在取消之前 t运行sferred 的数量。

它在 python3.5 中运行良好,但最近我尝试更新到 3.8 并且 运行 遇到了麻烦。

下面是示例代码,很明显它的行为与 3.5、3.6、3.7 和 3.8 有很大不同:

import asyncio
import sys


async def foo():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("foo got cancelled")
        return 1


async def main():
    coros = asyncio.gather(*(foo() for _ in range(3)))
    try:
        await asyncio.wait_for(coros, timeout=0.1)
    except asyncio.TimeoutError:
        print("main coroutine timed out")
        await coros
    return coros.result()


if __name__ == "__main__":
    print(sys.version)

    loop = asyncio.new_event_loop()
    try:
        results = loop.run_until_complete(main())
        print("results: {}".format(results))
    except Exception as e:
        print("exception in __main__:")
        print(e)
    finally:
        loop.close()
$ for ver in 3.5 3.6 3.7 3.8; do echo; python${ver} example.py; done

3.5.7 (default, Sep  6 2019, 07:49:56)
[GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.46.4)]
main coroutine timed out
foo got cancelled
foo got cancelled
foo got cancelled
results: [1, 1, 1]

3.6.9 (default, Sep  6 2019, 07:45:14)
[GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.46.4)]
main coroutine timed out
foo got cancelled
foo got cancelled
foo got cancelled
exception in __main__:


3.7.4 (default, Sep 17 2019, 13:46:30)
[Clang 10.0.1 (clang-1001.0.46.4)]
foo got cancelled
foo got cancelled
foo got cancelled
main coroutine timed out
exception in __main__:


3.8.0 (default, Oct 16 2019, 21:30:17)
[Clang 11.0.0 (clang-1100.0.33.8)]
foo got cancelled
foo got cancelled
foo got cancelled
main coroutine timed out
Traceback (most recent call last):
  File "example.py", line 28, in <module>
    results = loop.run_until_complete(main())
  File "/usr/local/var/pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
asyncio.exceptions.CancelledError

exception in __main__: 没有为 3.8 打印,因为 CancelledError 现在是 BaseException 而不是 Exception (编辑:这可能是回溯打印在这里而不是其他地方的原因) .

我已经尝试了一些在asyncio.gather中使用return_exceptions=True或在except asyncio.TimeoutError:块中捕获CancelledError的配置,但我似乎无法理解对。

我需要将 main 保留为异步函数,因为在我的实际代码中,它正在创建一个 aiohttp 会话供其他协程共享,而现代 aiohttp 要求在异步上下文管理器中完成此操作(而不是常规的同步上下文管理器)。

我希望代码能在 3.5-3.8 上运行,所以我没有使用 asyncio.run

我已经尝试了一些其他问题的代码,这些问题使用 .cancel() 有或没有 contextlib.suppress(asyncio.CancelledError),但仍然没有运气。我也试过 return 一个等待的值(例如 result = await coros; return result 而不是 return coros.result()),也没有骰子。

有什么好的方法可以让我在 python >3.5 中获得 python 3.5 行为,在这种情况下我可以让协程在超时时捕获 CancelledError 和 return 下次等待时的值?

提前致谢。

我进行了一些调试,看起来在 asyncio.gather 取消的情况下从未设置结果,因此无法从 python 3.8 中的 _GatheringFuture 对象检索它。

asyncio/tasks.py:792

            if outer._cancel_requested:
                # If gather is being cancelled we must propagate the
                # cancellation regardless of *return_exceptions* argument.
                # See issue 32684.
                outer.set_exception(exceptions.CancelledError())
            else:
                outer.set_result(results)

阅读文档后我发现了关于 asyncio.CancelledError:

In almost all situations the exception must be re-raised.

Imo,python 3.5 的行为是无意的。我不会依赖它。

虽然可以通过不使用 asyncio.gather 来解决这个问题,但不值得付出努力。如果您确实需要从已取消的协程中获取部分结果,则只需将其添加到某个全局列表即可:

    except asyncio.CancelledError:
        print("foo got cancelled")
        global_results.append(1)
        raise

感谢@RafalS 和他们停止使用 asyncio.gather 的建议。

与其使用 gatherwait_for,似乎直接将 .wait 的超时与协程一起使用可能是最好的选择,并且适用于 3.5 到 3.8。

请注意,下面的 bash 命令略有修改,以显示任务正在 运行 同时取消,而不等待 foo 完成。

import asyncio
import sys


async def foo():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        pass
    finally:
        return 1


async def main():
    coros = [foo() for _ in range(3)]
    done, pending = await asyncio.wait(coros, timeout=1.0)
    for task in pending:
        task.cancel()
        await task
    return [task.result() for task in done | pending]


if __name__ == "__main__":
    print(sys.version)

    loop = asyncio.new_event_loop()
    try:
        results = loop.run_until_complete(main())
        print("results: {}".format(results))
    finally:
        loop.close()
$ for ver in 3.5 3.6 3.7 3.8; do echo; time python${ver} example.py; done

3.5.7 (default, Sep  6 2019, 07:49:56)
[GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.46.4)]
results: [1, 1, 1]

real    0m1.634s
user    0m0.173s
sys     0m0.106s

3.6.9 (default, Sep  6 2019, 07:45:14)
[GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.46.4)]
results: [1, 1, 1]

real    0m1.643s
user    0m0.184s
sys     0m0.100s

3.7.4 (default, Sep 17 2019, 13:46:30)
[Clang 10.0.1 (clang-1001.0.46.4)]
results: [1, 1, 1]

real    0m1.499s
user    0m0.129s
sys     0m0.089s

3.8.0 (default, Oct 16 2019, 21:30:17)
[Clang 11.0.0 (clang-1100.0.33.8)]
results: [1, 1, 1]

real    0m1.492s
user    0m0.141s
sys     0m0.087s