使用 aiohttp 嵌套 "async with"

nested "async with" using aiohttp

我想创建一个使用 aiohttp 进行 API 调用的调度程序 class。我试过这个:

import asyncio
import aiohttp

class MySession:
    def __init__(self):
        self.session = None

    async def __aenter__(self):
        async with aiohttp.ClientSession() as session:
            self.session = session
            return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

async def method1():
    async with MySession() as s:
        async with s.session.get("https://www.google.com") as resp:
            if resp.status == 200:
                print("successful call!")

loop = asyncio.get_event_loop()
loop.run_until_complete(method1())
loop.close()

但这只会导致错误:RuntimeError: Session is closed

__aenter__ 函数的第二种方法:

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

效果很好。这是一个好的构造吗?它不遵循如何使用 aiohttp 的示例。还想知道为什么第一种方法不起作用?

您不能在函数内部使用 with 并让上下文管理器保持打开状态,不行。 with with aiohttp.ClientSession() as session: 块在您使用 return 退出 __aenter__ 协程后立即退出!

对于特定情况,输入 aiohttp.ClientSession() 上下文管理器 does nothing but return self。所以对于那个类型,只需创建实例并将其存储在self.session,然后等待self.session.close()就足够了,是的。

嵌套异步上下文管理器的 general 模式是等待来自您自己的嵌套异步上下文管理器的 __aenter____aexit__ 方法方法(并可能传递异常信息):

class MySession:
    def __init__(self):
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        await self.session.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            return await self.session.__aexit__(exc_type, exc_val, exc_tb)

从技术上讲,在进入嵌套上下文管理器之前,您应该首先确保存在实际的 __aexit__ 属性:

class MySession:
    def __init__(self):
        self.session = None
        self._session_aexit = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        self._session_aexit = type(self.session).__aexit__
        await self.session.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            return await self._session_aexit.__aexit__(
                self.session, exc_type, exc_val, exc_tb)

参见official PEP that added the concept

您可以在外部管理该依赖项:

import asyncio
import aiohttp

class MySession:

    def __init__(self, client_session):
        self.session = client_session

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

async def method1():
    async with aiohttp.ClientSession() as client_session:
        async with MySession(client_session) as s:
            async with s.session.get("https://www.google.com") as resp:
                if resp.status == 200:
                    print("successful call!")

asyncio.run(method1())

async with 链变得太荒谬时,您可以使用 AsyncExitStack:

from contextlib import AsyncExitStack

async def method1():
    async with AsyncExitStack() as stack:
        client_session = await stack.enter_async_context(aiohttp.ClientSession())
        s = await stack.enter_async_context(MySession(client_session))
        async with s.session.get("https://www.google.com") as resp:
            if resp.status == 200:
                print("successful call!")