asyncio.gather 有选择性 return_exceptions

asyncio.gather with selective return_exceptions

我希望 asyncio.gather 立即引发除某些特定异常 class 之外的任何异常,该异常应该在结果列表中返回。现在,我只是稍微修改了 CPython 中 asyncio.gather 的规范实现并使用了它,但我想知道是否没有更规范的方法来做到这一点。

您可以使用更强大的 asyncio.wait 原语及其 return_when=asyncio.FIRST_EXCEPTION 选项来实现此类语义:

async def xgather(*coros, allowed_exc):
    results = {}
    pending = futures = list(map(asyncio.ensure_future, coros))
    while pending:
        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_EXCEPTION)
        for fut in done:
            try:
                results[fut] = fut.result()
            except allowed_exc as e:
                results[fut] = e
    return [results[fut] for fut in futures]

我们的想法是调用 wait,直到完成所有期货或观察到异常。异常依次存储或传播,具体取决于它是否匹配 allowed_exc。如果已成功收集所有结果和允许的异常,它们将以正确的顺序返回,如 asyncio.gather.

修改 asyncio.gather 实现的方法可能很容易在更新的 Python 版本上失败,因为代码访问 Future 对象的私有属性。此外,像 uvloop 这样的替代事件循环可以使它们的 gatherwait 更有效率,这将自动使基于 public API 的 xgather 受益].

测试代码:

import asyncio

async def fail():
    1/0

async def main():
    print(await xgather(asyncio.sleep(1), fail(), allowed_exc=OSError))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

当 运行 时,代码会立即引发,这是预期的 ZeroDivisionError 与允许的 OSError 异常不匹配。将 OSError 更改为 ZeroDivisionError 会导致代码休眠 1 秒并输出 [None, ZeroDivisionError('division by zero',)].