在 aiohttp 中使用共享 TCPConnector 时出错
error using shared TCPConnector in aiohttp
我一直在尝试在 aiohttp 中使用连接池,但运气不佳。用例是代码重复向几个服务器发出请求,我不想在每个请求上重新创建连接。这是重现问题的一些代码(错误是 Timeout context manager should be used inside a task
):
import asyncio
import logging
from asyncio import Future
from typing import Dict, List
from aiohttp import ClientTimeout, ClientSession, TCPConnector
class UrlService:
def __init__(self):
self.connector: TCPConnector = TCPConnector()
async def _fetch(self, session:ClientSession, url:str):
logging.info('requesting data from %s', url)
async with session.get(url) as response:
data = await response.text()
logging.info('received data from %s', url)
if response.status != 200:
text = await response.text()
return f'non 200 status for {url}: {text.strip()}'
return data
async def _make_requests(self, urls: List[str]) -> Future:
async with ClientSession(timeout=ClientTimeout(10),
connector=self.connector,
connector_owner=False) as session:
coroutines = []
for url in urls:
coroutines.append(self._fetch(session, url))
return await asyncio.gather(*coroutines)
def download(self, urls: List[str]) -> Dict[str, Dict]:
responses = asyncio.run(self._make_requests(urls))
return dict(zip(urls, responses))
if __name__ == '__main__':
url_service = UrlService()
search_urls = ['https://google.com', 'https://yahoo.com', 'https://bing.com']
data = url_service.download(search_urls)
for url, resp in data.items():
print(f'****** {url} ******')
print(f' resp len: {len(resp)}')
基于 aiobotocore
中的 this issue,我认为问题在于您是在任何预先存在的事件循环上下文之外创建 TCPConnector()
。在内部,TCPConnector
将在构建时获得自己的事件循环,并最终与之相关联。然后,当您执行 asyncio.run()
时,您最终会得到另一个实际用于 运行 _make_requests
的事件循环实例,然后它会尝试使用关联到不同事件的 TCPConnector
环形。这种不匹配似乎是错误的原因。
当我将 self.connector = TCPConnector()
行移动到 _make_requests()
的正文中时,问题就消失了。
我一直在尝试在 aiohttp 中使用连接池,但运气不佳。用例是代码重复向几个服务器发出请求,我不想在每个请求上重新创建连接。这是重现问题的一些代码(错误是 Timeout context manager should be used inside a task
):
import asyncio
import logging
from asyncio import Future
from typing import Dict, List
from aiohttp import ClientTimeout, ClientSession, TCPConnector
class UrlService:
def __init__(self):
self.connector: TCPConnector = TCPConnector()
async def _fetch(self, session:ClientSession, url:str):
logging.info('requesting data from %s', url)
async with session.get(url) as response:
data = await response.text()
logging.info('received data from %s', url)
if response.status != 200:
text = await response.text()
return f'non 200 status for {url}: {text.strip()}'
return data
async def _make_requests(self, urls: List[str]) -> Future:
async with ClientSession(timeout=ClientTimeout(10),
connector=self.connector,
connector_owner=False) as session:
coroutines = []
for url in urls:
coroutines.append(self._fetch(session, url))
return await asyncio.gather(*coroutines)
def download(self, urls: List[str]) -> Dict[str, Dict]:
responses = asyncio.run(self._make_requests(urls))
return dict(zip(urls, responses))
if __name__ == '__main__':
url_service = UrlService()
search_urls = ['https://google.com', 'https://yahoo.com', 'https://bing.com']
data = url_service.download(search_urls)
for url, resp in data.items():
print(f'****** {url} ******')
print(f' resp len: {len(resp)}')
基于 aiobotocore
中的 this issue,我认为问题在于您是在任何预先存在的事件循环上下文之外创建 TCPConnector()
。在内部,TCPConnector
将在构建时获得自己的事件循环,并最终与之相关联。然后,当您执行 asyncio.run()
时,您最终会得到另一个实际用于 运行 _make_requests
的事件循环实例,然后它会尝试使用关联到不同事件的 TCPConnector
环形。这种不匹配似乎是错误的原因。
当我将 self.connector = TCPConnector()
行移动到 _make_requests()
的正文中时,问题就消失了。