异步循环遍历分页 api

Looping through a paginated api asynchronously

我目前正在通过 API 以分页方式 returns 接近 100,000 个文档(每页 100 个)摄取数据。我目前有一些代码,大致功能如下:

while c <= limit:
    if not api_url:
        break

    req = urllib2.Request(api_url)
    opener = urllib2.build_opener()
    f = opener.open(req)
    response = simplejson.load(f)

    for item in response['documents']:
        # DO SOMETHING HERE 
    if 'more_url' in response:
        api_url = response['more_url']
    else:
        api_url = None
        break
    c += 1

这样下载数据真的很慢,我想知道是否有任何方法以异步方式循环访问页面。有人建议我看一下 twisted,但我不确定如何继续。

这里的情况是,除非您调用 API,否则您事先不知道接下来会读什么。想想看,你可以并行做什么?

我不知道您可以并行执行多少任务以及哪些任务,但让我们试试...

一些假设: - 您可以从 API 中检索数据而不受惩罚或限制 - 一个 page/batch 的数据处理可以独立于另一个

完成

慢的是 IO - 所以您可以立即将代码拆分为两个并行的 运行ning 任务 - 一个将读取数据,然后将其放入队列并继续读取,除非命中 limit/empty 如果队列已满则响应或暂停

然后是第二个任务,即从队列中获取数据,并对数据进行处理

因此您可以从另一个任务中调用一个任务

另一种方法是你有一个任务,即在读取数据后立即调用另一个任务,因此它们的执行将 运行 并行但略有偏移

我将如何实施?作为 celery tasks and yes requests

例如第二个:

@task
def do_data_process(data):
   # do something with data
   pass

@task
def parse_one_page(url):
    response = requests.get(url)
    data = response.json()

    if 'more_url' in data:
        parse_one_page.delay(data['more_url'])

    # and here do data processing in this task
    do_data_process(data)
    # or call worker and try to do this in other process
    # do_data_process.delay(data)

如果您要对代码添加限制,那么您将 运行 并行执行多少任务取决于您,您甚至可以在多台机器上部署工作人员并为 parse_one_page 设置单独的队列和 do_data_process

为什么采用这种方法,而不是扭曲或异步?

因为你有 cpu-bond 数据处理(json,然后是数据),因此最好有单独的进程,celery 非常适合它们。