asyncio.gather 有选择性 return_exceptions
asyncio.gather with selective return_exceptions
我希望 asyncio.gather
立即引发除某些特定异常 class 之外的任何异常,该异常应该在结果列表中返回。现在,我只是稍微修改了 CPython 中 asyncio.gather
的规范实现并使用了它,但我想知道是否没有更规范的方法来做到这一点。
您可以使用更强大的 asyncio.wait
原语及其 return_when=asyncio.FIRST_EXCEPTION
选项来实现此类语义:
async def xgather(*coros, allowed_exc):
results = {}
pending = futures = list(map(asyncio.ensure_future, coros))
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_EXCEPTION)
for fut in done:
try:
results[fut] = fut.result()
except allowed_exc as e:
results[fut] = e
return [results[fut] for fut in futures]
我们的想法是调用 wait
,直到完成所有期货或观察到异常。异常依次存储或传播,具体取决于它是否匹配 allowed_exc
。如果已成功收集所有结果和允许的异常,它们将以正确的顺序返回,如 asyncio.gather
.
修改 asyncio.gather
实现的方法可能很容易在更新的 Python 版本上失败,因为代码访问 Future
对象的私有属性。此外,像 uvloop
这样的替代事件循环可以使它们的 gather
和 wait
更有效率,这将自动使基于 public API 的 xgather
受益].
测试代码:
import asyncio
async def fail():
1/0
async def main():
print(await xgather(asyncio.sleep(1), fail(), allowed_exc=OSError))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
当 运行 时,代码会立即引发,这是预期的 ZeroDivisionError
与允许的 OSError
异常不匹配。将 OSError
更改为 ZeroDivisionError
会导致代码休眠 1 秒并输出 [None, ZeroDivisionError('division by zero',)]
.
我希望 asyncio.gather
立即引发除某些特定异常 class 之外的任何异常,该异常应该在结果列表中返回。现在,我只是稍微修改了 CPython 中 asyncio.gather
的规范实现并使用了它,但我想知道是否没有更规范的方法来做到这一点。
您可以使用更强大的 asyncio.wait
原语及其 return_when=asyncio.FIRST_EXCEPTION
选项来实现此类语义:
async def xgather(*coros, allowed_exc):
results = {}
pending = futures = list(map(asyncio.ensure_future, coros))
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_EXCEPTION)
for fut in done:
try:
results[fut] = fut.result()
except allowed_exc as e:
results[fut] = e
return [results[fut] for fut in futures]
我们的想法是调用 wait
,直到完成所有期货或观察到异常。异常依次存储或传播,具体取决于它是否匹配 allowed_exc
。如果已成功收集所有结果和允许的异常,它们将以正确的顺序返回,如 asyncio.gather
.
修改 asyncio.gather
实现的方法可能很容易在更新的 Python 版本上失败,因为代码访问 Future
对象的私有属性。此外,像 uvloop
这样的替代事件循环可以使它们的 gather
和 wait
更有效率,这将自动使基于 public API 的 xgather
受益].
测试代码:
import asyncio
async def fail():
1/0
async def main():
print(await xgather(asyncio.sleep(1), fail(), allowed_exc=OSError))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
当 运行 时,代码会立即引发,这是预期的 ZeroDivisionError
与允许的 OSError
异常不匹配。将 OSError
更改为 ZeroDivisionError
会导致代码休眠 1 秒并输出 [None, ZeroDivisionError('division by zero',)]
.