Python: 如何使用带有巨大 csv 文件的 asyncio 从循环中发送异步请求?

Python: How to use asyncio with huge csv-files to send asynchronous requests from loops?

我想遍历一个巨大的 url 列表并异步向它们发送请求。 由于带有 url 的 CSV 文件太大而无法一次加载,我想逐行读取行,每次加载行时,它应该启动一个请求并将结果保存到文件中。

我的问题是,如果我在使用 asyncio.gather 时理解正确,则必须立即收集所有任务。

如果你能告诉我如何更改我的代码,让它为 csv 文件的每一行发送异步请求,那就太好了。

这是我坚持使用的代码:

import asyncio
import aiohttp
import async_timeout

import csv

async def fetch( session, url ):
    async with async_timeout.timeout(10):
        try:
            async with session.get(url) as response:
                return response
        except Exception as e:
            print(str( e ))
            return False

async def write_result( result ):
    with open( 'results.csv', 'a' ) as csv_file:
        writer = csv.writer( csv_file )
        writer.writerow( result )

async def validate_page( session, url ):
    response = await fetch( session, url )
    await write_result( response )

async def main():
    async with aiohttp.ClientSession() as session:
        with open('urls.csv') as csv_file:
            for row in csv.reader( csv_file ):
                await validate_page( session, row )

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

要异步处理 csv 文件中的每一行,请使用以下方法。

对您当前的方法进行一系列优化和重组:

  • 如果每个单独的行仅包含 url,则无需为 csv 输入文件创建 csv.reader(只需遍历文件对象)
  • 不需要用额外的 async with async_timeout.timeout(10) 包装,因为 aiohttp.ClientSession 本身有 timeout 选项
  • 绝对不需要为每个已处理的 url 构造一个新的 writer = csv.writer( csv_file )(结果是)- 只创建一次 writer 对象(确保使用 asyncio.Lock 优雅地书写- 见下文)

import asyncio
import aiohttp

import csv

async def fetch(session, url):
    try:
        async with session.get(url, timeout=10) as response:
            return await response.text()
    except Exception as e:
        print(url, str(e))
        return False


async def write_result(result, writer):
    async with asyncio.Lock():   # lock for gracefully write to shared file object
        res = [<needed parts from result, >] # <- adjust a resulting list of strings
        writer.writerow(res)


async def validate_page(session, url, writer):
    res = await fetch(session, url)
    if res:
        await write_result(res, writer)


async def main():
    async with aiohttp.ClientSession() as session:
        with open('urls.csv') as csv_in, open('results.csv', 'a') as csv_out:
            writer = csv.writer(csv_out, delimiter=',')
            aws = [validate_page(session, url.strip(), writer) for url in csv_in]
            await asyncio.gather(*aws)
            print('!--- finished processing')

loop = asyncio.get_event_loop()
loop.run_until_complete(main())