我是否正确地将 aiohttp 与 psycopg2 一起使用?
Am I using aiohttp together with psycopg2 correctly?
我刚开始使用 asyncio/aiohttp,但我有一个 Python 脚本可以从 Postgres table 读取一批 URL:s,下载 URL:s,对每次下载运行一个处理函数(与问题无关),并将处理结果保存回 table.
简化形式如下所示:
import asyncio
import psycopg2
from aiohttp import ClientSession, TCPConnector
BATCH_SIZE = 100
def _get_pgconn():
return psycopg2.connect()
def db_conn(func):
def _db_conn(*args, **kwargs):
with _get_pgconn() as conn:
with conn.cursor() as cur:
return func(cur, *args, **kwargs)
conn.commit()
return _db_conn
async def run():
async with ClientSession(connector=TCPConnector(ssl=False, limit=100)) as session:
while True:
count = await run_batch(session)
if count == 0:
break
async def run_batch(session):
tasks = []
for url in get_batch():
task = asyncio.ensure_future(process_url(url, session))
tasks.append(task)
await asyncio.gather(*tasks)
results = [task.result() for task in tasks]
save_batch_result(results)
return len(results)
async def process_url(url, session):
try:
async with session.get(url, timeout=15) as response:
body = await response.read()
return process_body(body)
except:
return {...}
@db_conn
def get_batch(cur):
sql = "SELECT id, url FROM db.urls WHERE processed IS NULL LIMIT %s"
cur.execute(sql, (BATCH_SIZE,))
return cur.fetchall()
@db_conn
def save_batch_result(cur, results):
sql = "UPDATE db.urls SET a = %(a)s, processed = true WHERE id = %(id)s"
cur.executemany(sql, tuple(results))
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
但我觉得我一定在这里遗漏了一些东西。该脚本运行但似乎每批次都变得越来越慢。特别是对 process_url
函数的调用似乎随着时间的推移变得越来越慢。此外,使用的内存不断增长,所以我猜我可能无法在运行之间正确清理某些东西?
我在增加批大小时也遇到了问题,如果我超过 200,我似乎会从调用 session.get
中获得更高比例的异常。我试过使用 TCPConnector 的 limit
参数,将它设置得更高和更低,但我看不出它有多大帮助。也在几个不同的服务器上尝试过 运行 但它似乎是一样的。有什么方法可以考虑如何更有效地设置这些值?
如果能指出我在这里可能做错的地方,将不胜感激!
我刚开始使用 asyncio/aiohttp,但我有一个 Python 脚本可以从 Postgres table 读取一批 URL:s,下载 URL:s,对每次下载运行一个处理函数(与问题无关),并将处理结果保存回 table.
简化形式如下所示:
import asyncio
import psycopg2
from aiohttp import ClientSession, TCPConnector
BATCH_SIZE = 100
def _get_pgconn():
return psycopg2.connect()
def db_conn(func):
def _db_conn(*args, **kwargs):
with _get_pgconn() as conn:
with conn.cursor() as cur:
return func(cur, *args, **kwargs)
conn.commit()
return _db_conn
async def run():
async with ClientSession(connector=TCPConnector(ssl=False, limit=100)) as session:
while True:
count = await run_batch(session)
if count == 0:
break
async def run_batch(session):
tasks = []
for url in get_batch():
task = asyncio.ensure_future(process_url(url, session))
tasks.append(task)
await asyncio.gather(*tasks)
results = [task.result() for task in tasks]
save_batch_result(results)
return len(results)
async def process_url(url, session):
try:
async with session.get(url, timeout=15) as response:
body = await response.read()
return process_body(body)
except:
return {...}
@db_conn
def get_batch(cur):
sql = "SELECT id, url FROM db.urls WHERE processed IS NULL LIMIT %s"
cur.execute(sql, (BATCH_SIZE,))
return cur.fetchall()
@db_conn
def save_batch_result(cur, results):
sql = "UPDATE db.urls SET a = %(a)s, processed = true WHERE id = %(id)s"
cur.executemany(sql, tuple(results))
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
但我觉得我一定在这里遗漏了一些东西。该脚本运行但似乎每批次都变得越来越慢。特别是对 process_url
函数的调用似乎随着时间的推移变得越来越慢。此外,使用的内存不断增长,所以我猜我可能无法在运行之间正确清理某些东西?
我在增加批大小时也遇到了问题,如果我超过 200,我似乎会从调用 session.get
中获得更高比例的异常。我试过使用 TCPConnector 的 limit
参数,将它设置得更高和更低,但我看不出它有多大帮助。也在几个不同的服务器上尝试过 运行 但它似乎是一样的。有什么方法可以考虑如何更有效地设置这些值?
如果能指出我在这里可能做错的地方,将不胜感激!