Python AsyncIO 中异步生成器的产量

Yield from Async Generator in Python AsyncIO

我有一个简单的 class,它利用异步生成器来检索 URL 列表:

import aiohttp
import asyncio
import logging
import sys

LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3

FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

class ASYNC_GENERATOR(object):
    def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
        self.loop = loop
        self.semaphore = asyncio.Semaphore(n_semaphore)
        self.session = aiohttp.ClientSession(loop=self.loop)

    async def _get_url(self, url):
        """
        Sends an http GET request to an API endpoint
        """

        async with self.semaphore:
            async with self.session.get(url) as response:
                logger.info(f'Request URL: {url} [{response.status}]')
                read_response = await response.read()

                return {
                    'read': read_response,
                    'status': response.status,
                }

    def get_routes(self, urls):
        """
        Wrapper around _get_url (multiple urls asynchronously)

        This returns an async generator
        """

        # Asynchronous http GET requests
        coros = [self._get_url(url) for url in urls]
        futures = asyncio.as_completed(coros)
        for future in futures:
            yield self.loop.run_until_complete(future)

    def close(self):
        self.session._connector.close()

当我执行代码的主要部分时:

if __name__ == '__main__':
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    for response in responses:
        response = next(ag.get_routes(['https://httpbin.org/get']))
    ag.close()

日志打印出来:

[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]

由于 responses 是一个异步生成器,我希望它从异步生成器产生一个响应(它应该只在实际产生时发送请求),向没有 [= 的端点发送一个单独的请求18=] 参数,然后生成来自异步生成器的下一个响应。这应该在带有 x 参数的请求和没有参数的请求之间来回切换。相反,它使用 x 参数生成来自异步生成器的所有响应,然后是所有没有参数的 https 请求。

当我这样做时会发生类似的事情:

ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()

并且日志打印:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

相反,我想要的是:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

有时我想先检索所有回复,然后再做任何其他事情。但是,有时我想在从生成器生成下一个项目之前插入并发出中间请求(即生成器 returns 来自分页搜索结果,我想在移动之前处理每个页面的进一步链接到下一页)。

我需要更改什么才能达到所需的结果?

撇开 responses 是否是异步生成器的技术问题(它不是,因为 Python uses the term), your problem lies in as_completed. as_completed starts a bunch of coroutines in parallel and provides means to obtain their results as they complete. That the futures run in parallel is not exactly obvious from the documentation (improved in later versions), but it makes sense if you consider that the original concurrent.futures.as_completed 工作于围绕并行执行的基于线程的未来。从概念上讲,异步期货也是如此。

您的代码仅获得第一个(最快到达的)结果,然后开始做其他事情,同样使用 asyncio。传递给 as_completed 的其余协程不会仅仅因为没有人收集他们的结果而被冻结 - 他们在后台做他们的工作,一旦完成就可以 awaited (在你的情况下as_completed 中的代码,您可以使用 loop.run_until_complete() 访问它)。我敢猜测,没有参数的 URL 比只有参数 x 的 URL 需要更长的时间来检索,这就是为什么它在所有其他协程之后打印的原因。

换句话说,正在打印的那些日志行意味着 asyncio 正在执行它的工作并提供您请求的并行执行!如果你不想并行执行,那就不要要求了,串行执行它们:

def get_routes(self, urls):
    for url in urls:
        yield loop.run_until_complete(self._get_url(url))

但这是使用 asyncio 的糟糕方式 - 它的主循环是不可重入的,因此为了确保可组合性,您几乎肯定希望循环在顶层只旋转一​​次。这通常使用 loop.run_until_complete(main())loop.run_forever() 等结构来完成。正如 Martijn 指出的那样,您可以通过使 get_routes 成为实际的异步生成器来实现这一点,同时保留漂亮的生成器 API:

async def get_routes(self, urls):
    for url in urls:
        result = await self._get_url(url)
        yield result

现在您可以拥有一个如下所示的 main() 协程:

async def main():
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    async for response in responses:
        # simulate `next` with async iteration
        async for other_response in ag.get_routes(['https://httpbin.org/get']):
            break
    ag.close()

loop.run_until_complete(main())