如何在异步循环关闭之前等待对象的 __del__ 完成?
How can I wait for an object's __del__ to finish before the async loop closes?
我有一个 class,里面会有一个 aiohttp.ClientSession 对象。
通常当你使用
async with aiohttp.ClientSession() as session:
# some code
会话将关闭,因为调用了会话的 __aexit__ 方法。
我不能使用上下文管理器,因为我想在对象的整个生命周期内保持会话持久。
这个有效:
import asyncio
import aiohttp
class MyAPI:
def __init__(self):
self.session = aiohttp.ClientSession()
def __del__(self):
# Close connection when this object is destroyed
print('In __del__ now')
asyncio.shield(self.session.__aexit__(None, None, None))
async def main():
api = MyAPI()
asyncio.run(main())
然而,如果在某些地方引发异常,事件循环会在 __aexit__ 方法完成之前关闭。
我该如何克服这个问题?
堆栈跟踪:
Traceback (most recent call last):
File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 19, in <module>
asyncio.run(main())
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
return future.result()
File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 17, in main
raise ValueError
ValueError
In __del__ now
Exception ignored in: <function MyAPI.__del__ at 0x7f49982c0e18>
Traceback (most recent call last):
File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 11, in __del__
File "/usr/local/lib/python3.7/asyncio/tasks.py", line 765, in shield
File "/usr/local/lib/python3.7/asyncio/tasks.py", line 576, in ensure_future
File "/usr/local/lib/python3.7/asyncio/events.py", line 644, in get_event_loop
RuntimeError: There is no current event loop in thread 'MainThread'.
sys:1: RuntimeWarning: coroutine 'ClientSession.__aexit__' was never awaited
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f49982c2e10>
不要使用 __del__
挂钩来清理异步资源。你根本不能算它被调用,更不用说控制它何时被使用或者异步循环是否在那个时候仍然可用。你真的想明确地处理这个。
要么使 API 成为异步上下文管理器,要么在退出时使用 finally
处理程序显式清理资源,比如说; with
和 async with
语句基本上旨在封装传统上在 finally
块中处理的资源清理。
我会在此处将 API
实例设为上下文管理器:
class MyAPI:
def __init__(self):
self.session = aiohttp.ClientSession()
async def __aenter__(self):
return self
async def __aexit__(self, *excinfo):
await self.session.close()
请注意,ClientSession.__aexit__()
真正做的是在 self.close()
上等待,因此以上内容直接进入协程。
然后在主循环中使用它:
async def main():
async with MyAPI() as api:
pass
另一种选择是向 MyAPI
实例提供您自己的会话对象,并在完成后自行负责关闭它:
class MyAPI:
def __init__(self, session):
self.session = session
async def main():
session = aiohttp.ClientSession()
try:
api = MyAPI(session)
# do things with the API
finally:
await session.close()
正如@Martijn Pieters 所说,您不能强制事件循环等待对象的 __del__
析构函数调用。但是,您仍然可以使用 __del__
析构函数来关闭异步资源,方法是首先检查循环是否为 运行,如果不是,则启动一个新循环。例如,asyncio Redis 模块就使用了这种技术 when destructing its Client class。对于您的代码,具体来说,析构函数如下:
import asyncio
import aiohttp
class MyAPI:
def __init__(self):
self.session = aiohttp.ClientSession()
def __del__(self):
# Close connection when this object is destroyed
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self.session.close())
else:
loop.run_until_complete(self.session.close())
except Exception:
pass
我在写Django程序的时候(使用asgi)实现了共享session的方式。使用pid来标记不同进程的session,方便django在不同进程之间调用。
经过实际测试,我可以直接调用共享会话
- Django 3.2
- uvicorn
aiohttp.py
import os
import asyncio
import aiohttp
import logging
session_list = {}
logger = logging.getLogger(__name__)
class Req:
@property
def set_session(self):
try:
loop = asyncio.get_running_loop()
except:
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
session = aiohttp.ClientSession(loop=loop)
session_list.update({os.getpid(): session})
return session
def __init__(self):
if session_list.get(os.getpid()):
self.session = session_list.get(os.getpid())
else:
self.session = self.set_session
async def test(self):
if session_list:
session = session_list.get(os.getpid())
if session and session.closed:
session_list.pop(os.getpid())
session = self.set_session
else:
session = self.set_session
if not session or session.loop.is_running():
session = self.set_session
logger.warning("session abnormal")
result = await session.get("http://httpbing.org/get")
print(result.status)
req = Req()
views.py
from django.http import HttpResponse
from django.shortcuts import render # noqa
from django.views.generic import View
from django.utils.decorators import classonlymethod
import asyncio
class TTT(View):
@classonlymethod
def as_view(cls, **initkwargs):
view = super().as_view(**initkwargs)
view._is_coroutine = asyncio.coroutines._is_coroutine
return view
async def get(self, request):
await req.test()
return HttpResponse("ok")
谢谢你,@alan。
我使用了您的示例并向其中添加了一些输入。我在 class 中使用 pyppeteer
。不能 100% 确定它是否正确,但至少不会再出现关于 运行 循环的异常,并且它作为 __del__
的一部分执行。我现在将其用作包装函数,将我的异步代码转换为同步代码。它有点难看,但它有效。
我现在可以在对象被销毁时安全地关闭浏览器实例。
我输入的例子
from asyncio import get_event_loop
from typing import TypeVar, Callable, Coroutine, Any
ReturnType = TypeVar("ReturnType")
def async_to_sync(callable_function: Callable[[], Coroutine[Any, Any, ReturnType]]) -> ReturnType:
loop = get_event_loop()
if loop.is_running():
return loop.create_task(callable_function())
else:
return loop.run_until_complete(callable_function())
我有一个 class,里面会有一个 aiohttp.ClientSession 对象。
通常当你使用
async with aiohttp.ClientSession() as session:
# some code
会话将关闭,因为调用了会话的 __aexit__ 方法。
我不能使用上下文管理器,因为我想在对象的整个生命周期内保持会话持久。
这个有效:
import asyncio
import aiohttp
class MyAPI:
def __init__(self):
self.session = aiohttp.ClientSession()
def __del__(self):
# Close connection when this object is destroyed
print('In __del__ now')
asyncio.shield(self.session.__aexit__(None, None, None))
async def main():
api = MyAPI()
asyncio.run(main())
然而,如果在某些地方引发异常,事件循环会在 __aexit__ 方法完成之前关闭。 我该如何克服这个问题?
堆栈跟踪:
Traceback (most recent call last):
File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 19, in <module>
asyncio.run(main())
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
return future.result()
File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 17, in main
raise ValueError
ValueError
In __del__ now
Exception ignored in: <function MyAPI.__del__ at 0x7f49982c0e18>
Traceback (most recent call last):
File "/home/ron/.PyCharm2018.3/config/scratches/async.py", line 11, in __del__
File "/usr/local/lib/python3.7/asyncio/tasks.py", line 765, in shield
File "/usr/local/lib/python3.7/asyncio/tasks.py", line 576, in ensure_future
File "/usr/local/lib/python3.7/asyncio/events.py", line 644, in get_event_loop
RuntimeError: There is no current event loop in thread 'MainThread'.
sys:1: RuntimeWarning: coroutine 'ClientSession.__aexit__' was never awaited
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f49982c2e10>
不要使用 __del__
挂钩来清理异步资源。你根本不能算它被调用,更不用说控制它何时被使用或者异步循环是否在那个时候仍然可用。你真的想明确地处理这个。
要么使 API 成为异步上下文管理器,要么在退出时使用 finally
处理程序显式清理资源,比如说; with
和 async with
语句基本上旨在封装传统上在 finally
块中处理的资源清理。
我会在此处将 API
实例设为上下文管理器:
class MyAPI:
def __init__(self):
self.session = aiohttp.ClientSession()
async def __aenter__(self):
return self
async def __aexit__(self, *excinfo):
await self.session.close()
请注意,ClientSession.__aexit__()
真正做的是在 self.close()
上等待,因此以上内容直接进入协程。
然后在主循环中使用它:
async def main():
async with MyAPI() as api:
pass
另一种选择是向 MyAPI
实例提供您自己的会话对象,并在完成后自行负责关闭它:
class MyAPI:
def __init__(self, session):
self.session = session
async def main():
session = aiohttp.ClientSession()
try:
api = MyAPI(session)
# do things with the API
finally:
await session.close()
正如@Martijn Pieters 所说,您不能强制事件循环等待对象的 __del__
析构函数调用。但是,您仍然可以使用 __del__
析构函数来关闭异步资源,方法是首先检查循环是否为 运行,如果不是,则启动一个新循环。例如,asyncio Redis 模块就使用了这种技术 when destructing its Client class。对于您的代码,具体来说,析构函数如下:
import asyncio
import aiohttp
class MyAPI:
def __init__(self):
self.session = aiohttp.ClientSession()
def __del__(self):
# Close connection when this object is destroyed
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self.session.close())
else:
loop.run_until_complete(self.session.close())
except Exception:
pass
我在写Django程序的时候(使用asgi)实现了共享session的方式。使用pid来标记不同进程的session,方便django在不同进程之间调用。
经过实际测试,我可以直接调用共享会话
- Django 3.2
- uvicorn
aiohttp.py
import os
import asyncio
import aiohttp
import logging
session_list = {}
logger = logging.getLogger(__name__)
class Req:
@property
def set_session(self):
try:
loop = asyncio.get_running_loop()
except:
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
session = aiohttp.ClientSession(loop=loop)
session_list.update({os.getpid(): session})
return session
def __init__(self):
if session_list.get(os.getpid()):
self.session = session_list.get(os.getpid())
else:
self.session = self.set_session
async def test(self):
if session_list:
session = session_list.get(os.getpid())
if session and session.closed:
session_list.pop(os.getpid())
session = self.set_session
else:
session = self.set_session
if not session or session.loop.is_running():
session = self.set_session
logger.warning("session abnormal")
result = await session.get("http://httpbing.org/get")
print(result.status)
req = Req()
views.py
from django.http import HttpResponse
from django.shortcuts import render # noqa
from django.views.generic import View
from django.utils.decorators import classonlymethod
import asyncio
class TTT(View):
@classonlymethod
def as_view(cls, **initkwargs):
view = super().as_view(**initkwargs)
view._is_coroutine = asyncio.coroutines._is_coroutine
return view
async def get(self, request):
await req.test()
return HttpResponse("ok")
谢谢你,@alan。
我使用了您的示例并向其中添加了一些输入。我在 class 中使用 pyppeteer
。不能 100% 确定它是否正确,但至少不会再出现关于 运行 循环的异常,并且它作为 __del__
的一部分执行。我现在将其用作包装函数,将我的异步代码转换为同步代码。它有点难看,但它有效。
我现在可以在对象被销毁时安全地关闭浏览器实例。
我输入的例子
from asyncio import get_event_loop
from typing import TypeVar, Callable, Coroutine, Any
ReturnType = TypeVar("ReturnType")
def async_to_sync(callable_function: Callable[[], Coroutine[Any, Any, ReturnType]]) -> ReturnType:
loop = get_event_loop()
if loop.is_running():
return loop.create_task(callable_function())
else:
return loop.run_until_complete(callable_function())