如何在异步循环关闭之前等待对象的 __del__ 完成?

How can I wait for an object's __del__ to finish before the async loop closes?

我有一个 class,里面会有一个 aiohttp.ClientSession 对象。

通常当你使用

async with aiohttp.ClientSession() as session:  
   # some code

会话将关闭,因为调用了会话的 __aexit__ 方法。

我不能使用上下文管理器,因为我想在对象的整个生命周期内保持会话持久。

这个有效:

import asyncio
import aiohttp

class MyAPI:
    def __init__(self):
        self.session = aiohttp.ClientSession()

    def __del__(self):
        # Close connection when this object is destroyed
        print('In __del__ now')
        asyncio.shield(self.session.__aexit__(None, None, None))



async def main():
    api = MyAPI()

asyncio.run(main())

然而,如果在某些地方引发异常,事件循环会在 __aexit__ 方法完成之前关闭。 我该如何克服这个问题?

堆栈跟踪:

Traceback (most recent call last):
  File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 19, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 17, in main
    raise ValueError
ValueError
In __del__ now
Exception ignored in: <function MyAPI.__del__ at 0x7f49982c0e18>
Traceback (most recent call last):
  File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 11, in __del__
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 765, in shield
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 576, in ensure_future
  File "/usr/local/lib/python3.7/asyncio/events.py", line 644, in get_event_loop
RuntimeError: There is no current event loop in thread 'MainThread'.
sys:1: RuntimeWarning: coroutine 'ClientSession.__aexit__' was never awaited
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f49982c2e10>

不要使用 __del__ 挂钩来清理异步资源。你根本不能算它被调用,更不用说控制它何时被使用或者异步循环是否在那个时候仍然可用。你真的想明确地处理这个。

要么使 API 成为异步上下文管理器,要么在退出时使用 finally 处理程序显式清理资源,比如说; withasync with 语句基本上旨在封装传统上在 finally 块中处理的资源清理。

我会在此处将 API 实例设为上下文管理器:

class MyAPI:
    def __init__(self):
        self.session = aiohttp.ClientSession()

    async def __aenter__(self):
        return self

    async def __aexit__(self, *excinfo):
        await self.session.close()

请注意,ClientSession.__aexit__() 真正做的是在 self.close() 上等待,因此以上内容直接进入协程。

然后在主循环中使用它:

async def main():
    async with MyAPI() as api:
        pass

另一种选择是向 MyAPI 实例提供您自己的会话对象,并在完成后自行负责关闭它:

class MyAPI:
    def __init__(self, session):
        self.session = session

async def main():
    session = aiohttp.ClientSession()
    try:
        api = MyAPI(session)
        # do things with the API
    finally:
        await session.close()

正如@Martijn Pieters 所说,您不能强制事件循环等待对象的 __del__ 析构函数调用。但是,您仍然可以使用 __del__ 析构函数来关闭异步资源,方法是首先检查循环是否为 运行,如果不是,则启动一个新循环。例如,asyncio Redis 模块就使用了这种技术 when destructing its Client class。对于您的代码,具体来说,析构函数如下:

import asyncio
import aiohttp


class MyAPI:

    def __init__(self):
        self.session = aiohttp.ClientSession()

    def __del__(self):
        # Close connection when this object is destroyed
        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                loop.create_task(self.session.close())
            else:
                loop.run_until_complete(self.session.close())
        except Exception:
            pass

我在写Django程序的时候(使用asgi)实现了共享session的方式。使用pid来标记不同进程的session,方便django在不同进程之间调用。

经过实际测试,我可以直接调用共享会话

  • Django 3.2
  • uvicorn

aiohttp.py

import os
import asyncio
import aiohttp
import logging

session_list = {}
logger = logging.getLogger(__name__)


class Req:

    @property
    def set_session(self):
        try:
            loop = asyncio.get_running_loop()
        except:
            loop = asyncio.get_event_loop()
            asyncio.set_event_loop(loop)
        session = aiohttp.ClientSession(loop=loop)
        session_list.update({os.getpid(): session})
        return session

    def __init__(self):
        if session_list.get(os.getpid()):
            self.session = session_list.get(os.getpid())
        else:
            self.session = self.set_session

    async def test(self):
        if session_list:
            session = session_list.get(os.getpid())
            if session and session.closed:
                session_list.pop(os.getpid())
                session = self.set_session
        else:
            session = self.set_session

        if not session or session.loop.is_running():
            session = self.set_session
            logger.warning("session abnormal")
        result = await session.get("http://httpbing.org/get")
        print(result.status)


req = Req()

views.py

from django.http import HttpResponse
from django.shortcuts import render  # noqa
from django.views.generic import View
from django.utils.decorators import classonlymethod

import asyncio


class TTT(View):

    @classonlymethod
    def as_view(cls, **initkwargs):
        view = super().as_view(**initkwargs)
        view._is_coroutine = asyncio.coroutines._is_coroutine
        return view

    async def get(self, request):
        await req.test()
        return HttpResponse("ok")

谢谢你,@alan。 我使用了您的示例并向其中添加了一些输入。我在 class 中使用 pyppeteer。不能 100% 确定它是否正确,但至少不会再出现关于 运行 循环的异常,并且它作为 __del__ 的一部分执行。我现在将其用作包装函数,将我的异步代码转换为同步代码。它有点难看,但它有效。 我现在可以在对象被销毁时安全地关闭浏览器实例。

我输入的例子

from asyncio import get_event_loop
from typing import TypeVar, Callable, Coroutine, Any

ReturnType = TypeVar("ReturnType")


def async_to_sync(callable_function: Callable[[], Coroutine[Any, Any, ReturnType]]) -> ReturnType:
    loop = get_event_loop()
    if loop.is_running():
        return loop.create_task(callable_function())
    else:
        return loop.run_until_complete(callable_function())