Python 3.6 使用 aiohttp 的异步 GET 请求是 运行 同步的

Python 3.6 async GET requests in with aiohttp are running synchronously

我的以下功能正常运行,但由于某种原因,请求似乎是同步执行的,而不是异步执行的。

我现在的假设是,这是因为主函数中的 for record in records for 循环而发生的,但我不确定如何更改它以便请求可以异步执行。如果不是这种情况,我还需要更改什么?

async def do_request(query_string):
        base_url = 'https://maps.googleapis.com/maps/api/place/textsearch/json?'
        params = {'key': google_api_key,
                  'query': query_string}
        async with aiohttp.ClientSession() as session:
            async with session.request('GET', base_url, params=params) as resp:
                return resp


async def main():
    create_database_and_tables()
    records = prep_sample_data()[:100]

    for record in records:
        r = Record(record)

        if not r.is_valid:
            continue

        query_string = r.generate_query_string()

        resp = await do_request(query_string)
        print("NOW WRITE TO DATABASE")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

您正在依次等待 do_request() 个单独的呼叫。不要直接等待它们(在协程完成之前会阻塞),而是使用 asyncio.gather() function 让事件循环 运行 它们同时发生:

async def main():
    create_database_and_tables()
    records = prep_sample_data()[:100]

    requests = []
    for record in records:
        r = Record(record)

        if not r.is_valid:
            continue

        query_string = r.generate_query_string()

        requests.append(do_request(query_string))

    for resp in asyncio.gather(*requests):
        print("NOW WRITE TO DATABASE")

asyncio.gather() return 值是协程 return 编辑的所有结果的列表,其顺序与您将它们传递给 gather() 函数的顺序相同。

如果您需要原始记录来处理响应,您可以通过几种不同的方式将记录和查询字符串配对:

  • 将有效记录存储在单独的列表中,并在处理响应时使用 zip() 将它们再次配对
  • 使用辅助协程获取有效记录、生成查询字符串、调用请求,然后return将记录和响应一起作为一个元组。

您还可以将响应处理混合到一个聚集的协程中;一个获取记录,生成查询字符串,等待 do_request,然后在响应准备好时将结果存储在数据库中。

换句话说,将需要连续发生的工作分配到协程中,然后收集这些工作。

基于 Martijn 的回答

如果请求的顺序对您来说不太重要(当它被写入数据库时​​),您可以在获取命令时将响应写入数据库。

编辑(解释更多):我在这里使用了 2 个信号量。 1是通过aiohttp限制连接数。这将取决于您的系统。大多数 linux 系统默认为 1024。根据我个人的经验,最好将其设置为低于 OS 最大值。

max_coroutines是解决协程运行一次太多的问题

我使用 asyncio.ensure_future() 以便我们在构建列表时 运行 协程。这样,您就不会在执行任何协程之前创建完整的协程列表。

# Limit the total number of requests you make by 512 open connections.
max_request_semaphore = asyncio.BoundedSemaphore(512)
max_coroutines = asyncio.BoundedSemaphore(10000)


async def process_response(response):
    print('Process your response to your database')


async def do_request(query_string):
    base_url = 'https://maps.googleapis.com/maps/api/place/textsearch/json?'
    params = {'key': google_api_key,
              'query': query_string}
    async with max_request_semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.request('GET', base_url, params=params) as resp:
                return resp


# Excuse me for the bad function naming
async do_full_request(query_string):
    resp = await do_request(query_string)
    await process_response(resp)
    max_coroutines.release()

async def main():
    create_database_and_tables()
    records = prep_sample_data()[:100]

    requests = []
    for record in records:
        r = Record(record)

        if not r.is_valid:
            continue

        query_string = r.generate_query_string()

        # Will prevent more than 10k coroutines created.
        await max_coroutines.acquire()
        requests.append(
            asyncio.ensure_future(
                do_full_request(query_string)))

    # Now gather all the coroutines
    await asyncio.gather(*requests)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())