获取嵌套网址时如何在异步中链接协程

How to chain coroutines in asyncio when fetch nested urls

我目前正在设计一个蜘蛛来抓取特定网站。我可以同步进行,但我正试图让我的头脑围绕 asyncio 使其尽可能高效。我尝试了很多不同的方法,使用 yieldchained functionsqueues,但我无法使它起作用。

我最感兴趣的是设计部分和解决问题的逻辑。不需要可运行的代码,而是突出 assyncio 最重要的方面。我不能 post 任何代码,因为我的尝试不值得分享。

使命:

exemple.com(我知道,应该是example.com)得到了如下设计:

在同步方式下,逻辑是这样的:

for table in my_url_list:
    # Get HTML
    # Extract urls from HTML to user_list
    for user in user_list:
        # Get HTML
        # Extract urls from HTML to user_subcat_list
        for subcat in user_subcat_list:
            # extract content

但现在我想异步抓取网站。假设我们使用 5 个实例(pyppeteer 中的选项卡或 aiohttp 中的请求)来解析内容。我们应该如何设计它以使其最有效以及我们应该使用什么 asyncio 语法?

更新

感谢@user4815162342 解决了我的问题。我一直在研究他的解决方案,如果其他人想研究 asyncio,我会在下面 post 运行代码。

import asyncio
import random
 
my_url_list = ['exemple.com/table1', 'exemple.com/table2', 'exemple.com/table3']


# Random sleeps to simulate requests to the server
async def randsleep(caller=None):
    i = random.randint(1, 6)
    if caller:
        print(f"Request HTML for {caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)


async def process_urls(url_list):
    print(f'async def process_urls: added {url_list}')
    limit = asyncio.Semaphore(5)
    coros = [process_user_list(table, limit) for table in url_list]
    await asyncio.gather(*coros)


async def process_user_list(table, limit):
    async with limit:
        # Simulate HTML request and extracting urls to populate user_list
        await randsleep(table)
        if table[-1] == '1':
            user_list = ['exemple.com/user1', 'exemple.com/user2', 'exemple.com/user3']
        elif table[-1] == '2':
            user_list = ['exemple.com/user4', 'exemple.com/user5', 'exemple.com/user6']
        else:
            user_list = ['exemple.com/user7', 'exemple.com/user8', 'exemple.com/user9']

        print(f'async def process_user_list: Extracted {user_list} from {table}')

    # Execute process_user in parallel, but do so outside the `async with`
    # because process_user will also need the semaphore, and we don't need
    # it any more since we're done with fetching HTML.
    coros = [process_user(user, limit) for user in user_list]
    await asyncio.gather(*coros)


async def process_user(user, limit):
    async with limit:
        # Simulate HTML request and extracting urls to populate user_subcat_list
        await randsleep(user)
        user_subcat_list = [user + '/profile', user + '/info', user + '/followers']
        print(f'async def process_user: Extracted {user_subcat_list} from {user}')

    coros = [process_subcat(subcat, limit) for subcat in user_subcat_list]
    await asyncio.gather(*coros)


async def process_subcat(subcat, limit):
    async with limit:
        # Simulate HTML request and extracting content
        await randsleep(subcat)
        print(f'async def process_subcat: Extracted content from {subcat}')

if __name__ == '__main__':
    asyncio.run(process_urls(my_url_list))

让我们重构同步代码,让每个可以访问网络的部分都在一个单独的函数中。功能不变,但以后会更方便:

def process_urls(url_list):
    for table in url_list:
        process_user_list(table)

def process_user_list(table):
    # Get HTML, extract user_list
    for user in user_list:
        process_user(user)

def process_user(user):
    # Get HTML, extract user_subcat_list
    for subcat in user_subcat_list:
        process_subcat(subcat)

def process_subcat(subcat):
    # get HTML, extract content

if __name__ == '__main__':
    process_urls(my_url_list)

假设处理顺序无关紧要,我们希望异步版本 运行 现在在 for 循环中并行调用的所有函数。它们仍然 运行 在单个线程上,但它们会 await 任何可能阻塞的东西,允许事件循环并行等待并通过在每个协程准备好继续时恢复它们来驱动它们完成.这是通过将每个协程作为一个单独的任务来实现的,该任务 运行 独立于其他任务,因此是并行的。例如,process_urls 的顺序(但仍然是异步)版本将如下所示:

async def process_urls(url_list):
    for table in url_list:
        await process_user_list(table)

这是异步的,因为它 运行 在一个事件循环中,您可以 运行 并行执行多个这样的函数(我们将很快展示如何做),但它也是顺序,因为它选择 await 每次调用 process_user_list。在每次循环迭代中,await 明确指示 asyncio 暂停执行 process_urls,直到 process_user_list 的结果可用。

我们想要的是告诉 asyncio 运行 所有 process_user_list 的并行调用,并暂停执行 process_urls 直到它们全部完成。在“后台”生成协程的基本原语是使用 asyncio.create_task 将其安排为 任务 ,这是最接近 light-weight 线程的异步等价物.使用 create_task process_urls 的并行版本将如下所示:

async def process_urls(url_list):
    # spawn a task for each table
    tasks = []
    for table in url_list:
        asyncio.create_task(process_user_list(table))
        tasks.append(task)
    # The tasks are now all spawned, so awaiting one task lets
    # them all run.
    for task in tasks:
        await task

第二个循环乍一看好像和之前的版本一样按顺序等待任务,但事实并非如此。由于每个 await 暂停到事件循环,等待任何任务允许 所有 任务进行,只要它们事先使用 create_task() 安排。总等待时间不会超过最长任务的时间,.

这种模式被经常使用,以至于 asyncio 有一个专门的实用函数,asyncio.gather。使用这个函数,相同的代码可以用更短的版本来表达:

async def process_urls(url_list):
    coros = [process_user_list(table) for table in url_list]
    await asyncio.gather(*coros)

但是还有一件事需要注意:因为 process_user_list 将从服务器获取 HTML 并且会有很多它的实例 运行ning 并行,我们不能让它通过数百个同时连接来攻击服务器。我们可以创建一个工作任务池和某种队列,但 asyncio 提供了一个更优雅的解决方案:semaphore。信号量是一种同步设备,不允许超过 pre-determined 个并行激活,使其余的排队等待。

process_urls 的最终版本创建了一个信号量并将其向下传递。它不会激活信号量,因为 process_urls 本身实际上并没有获取任何 HTML,因此它没有理由在 process_user_list 是 运行宁.

async def process_urls(url_list):
    limit = asyncio.Semaphore(5)
    coros = [process_user_list(table, limit) for table in url_list]
    await asyncio.gather(*coros)

process_user_list 看起来很相似,但它确实需要使用 async with:

激活信号量
async def process_user_list(table, limit):
    async with limit:
        # Get HTML using aiohttp, extract user_list

    # Execute process_user in parallel, but do so outside the `async with`
    # because process_user will also need the semaphore, and we don't need
    # it any more since we're done with fetching HTML.
    coros = [process_user(user, limit) for user in user_list]
    await asyncio.gather(*coros)

process_userprocess_subcat 更相似:

async def process_user(user, limit):
    async with limit:
        # Get HTML, extract user_subcat_list
    coros = [process_subcat(subcat, limit) for subcat in user_subcat_list]
    await asyncio.gather(*coros)

def process_subcat(subcat, limit):
    async with limit:
        # get HTML, extract content
    # do something with content

if __name__ == '__main__':
    asyncio.run(process_urls(my_url_list))

在实践中,您可能希望异步函数共享同一个 aiohttp 会话,因此您可能会在 top-level 函数(在您的情况下为 process_urls )中创建它并将其传递下去与信号量一起。每个获取 HTML 的函数都会有另一个 async with 用于 aiohttp request/response,例如:

async with limit:
    async with session.get(url, params...) as resp:
        # get HTML data here
        resp.raise_for_status()
        resp = await resp.read()
# extract content from HTML data here

两个 async with 可以合并为一个,减少缩进但保持相同的含义:

async with limit, session.get(url, params...) as resp:
    # get HTML data here
    resp.raise_for_status()
    resp = await resp.read()
# extract content from HTML data here