Python: 如何使用带有巨大 csv 文件的 asyncio 从循环中发送异步请求?
Python: How to use asyncio with huge csv-files to send asynchronous requests from loops?
我想遍历一个巨大的 url 列表并异步向它们发送请求。
由于带有 url 的 CSV 文件太大而无法一次加载,我想逐行读取行,每次加载行时,它应该启动一个请求并将结果保存到文件中。
我的问题是,如果我在使用 asyncio.gather
时理解正确,则必须立即收集所有任务。
如果你能告诉我如何更改我的代码,让它为 csv 文件的每一行发送异步请求,那就太好了。
这是我坚持使用的代码:
import asyncio
import aiohttp
import async_timeout
import csv
async def fetch( session, url ):
async with async_timeout.timeout(10):
try:
async with session.get(url) as response:
return response
except Exception as e:
print(str( e ))
return False
async def write_result( result ):
with open( 'results.csv', 'a' ) as csv_file:
writer = csv.writer( csv_file )
writer.writerow( result )
async def validate_page( session, url ):
response = await fetch( session, url )
await write_result( response )
async def main():
async with aiohttp.ClientSession() as session:
with open('urls.csv') as csv_file:
for row in csv.reader( csv_file ):
await validate_page( session, row )
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
要异步处理 csv 文件中的每一行,请使用以下方法。
对您当前的方法进行一系列优化和重组:
- 如果每个单独的行仅包含 url,则无需为 csv 输入文件创建
csv.reader
(只需遍历文件对象)
- 不需要用额外的
async with async_timeout.timeout(10)
包装,因为 aiohttp.ClientSession
本身有 timeout
选项
- 绝对不需要为每个已处理的 url 构造一个新的
writer = csv.writer( csv_file )
(结果是)- 只创建一次 writer
对象(确保使用 asyncio.Lock 优雅地书写- 见下文)
import asyncio
import aiohttp
import csv
async def fetch(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(url, str(e))
return False
async def write_result(result, writer):
async with asyncio.Lock(): # lock for gracefully write to shared file object
res = [<needed parts from result, >] # <- adjust a resulting list of strings
writer.writerow(res)
async def validate_page(session, url, writer):
res = await fetch(session, url)
if res:
await write_result(res, writer)
async def main():
async with aiohttp.ClientSession() as session:
with open('urls.csv') as csv_in, open('results.csv', 'a') as csv_out:
writer = csv.writer(csv_out, delimiter=',')
aws = [validate_page(session, url.strip(), writer) for url in csv_in]
await asyncio.gather(*aws)
print('!--- finished processing')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
我想遍历一个巨大的 url 列表并异步向它们发送请求。 由于带有 url 的 CSV 文件太大而无法一次加载,我想逐行读取行,每次加载行时,它应该启动一个请求并将结果保存到文件中。
我的问题是,如果我在使用 asyncio.gather
时理解正确,则必须立即收集所有任务。
如果你能告诉我如何更改我的代码,让它为 csv 文件的每一行发送异步请求,那就太好了。
这是我坚持使用的代码:
import asyncio
import aiohttp
import async_timeout
import csv
async def fetch( session, url ):
async with async_timeout.timeout(10):
try:
async with session.get(url) as response:
return response
except Exception as e:
print(str( e ))
return False
async def write_result( result ):
with open( 'results.csv', 'a' ) as csv_file:
writer = csv.writer( csv_file )
writer.writerow( result )
async def validate_page( session, url ):
response = await fetch( session, url )
await write_result( response )
async def main():
async with aiohttp.ClientSession() as session:
with open('urls.csv') as csv_file:
for row in csv.reader( csv_file ):
await validate_page( session, row )
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
要异步处理 csv 文件中的每一行,请使用以下方法。
对您当前的方法进行一系列优化和重组:
- 如果每个单独的行仅包含 url,则无需为 csv 输入文件创建
csv.reader
(只需遍历文件对象) - 不需要用额外的
async with async_timeout.timeout(10)
包装,因为aiohttp.ClientSession
本身有timeout
选项 - 绝对不需要为每个已处理的 url 构造一个新的
writer = csv.writer( csv_file )
(结果是)- 只创建一次writer
对象(确保使用 asyncio.Lock 优雅地书写- 见下文)
import asyncio
import aiohttp
import csv
async def fetch(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(url, str(e))
return False
async def write_result(result, writer):
async with asyncio.Lock(): # lock for gracefully write to shared file object
res = [<needed parts from result, >] # <- adjust a resulting list of strings
writer.writerow(res)
async def validate_page(session, url, writer):
res = await fetch(session, url)
if res:
await write_result(res, writer)
async def main():
async with aiohttp.ClientSession() as session:
with open('urls.csv') as csv_in, open('results.csv', 'a') as csv_out:
writer = csv.writer(csv_out, delimiter=',')
aws = [validate_page(session, url.strip(), writer) for url in csv_in]
await asyncio.gather(*aws)
print('!--- finished processing')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())