`async for` 的语义 - __anext__ 调用可以重叠吗?

Semantics of `async for` - can __anext__ calls overlap?

如果 __anext__ 将控制权交还给事件循环(通过 awaitcall_soon/call_later),是否有可能另一个 __anext__在同一个实例上调用,而第一个实例尚未解决,否则它们将排队?有没有其他情况假设只有一个 __anext__ 同时是 运行 是不安全的?

简答: 使用 async for 不会重叠,但调用 __anext__ 会重叠。

长答案:

这是我在玩 __anext__ 机制时所做的:

import asyncio

class Foo(object):
    def __init__(self):
        self.state = 0

    def __aiter__(self):
        return self

    def __anext__(self):
        def later():
            try:
                print(f'later: called when state={self.state}')

                self.state += 1
                if self.state == 3:
                    future.set_exception(StopAsyncIteration())
                else:
                    future.set_result(self.state)
            finally:
                print(f'later: left when state={self.state}')

        print(f'__anext__: called when state={self.state}')
        try:
            future = asyncio.Future()

            loop.call_later(0.1, later)

            return future
        finally:
            print(f'__anext__: left when state={self.state}')

async def main():
    print('==== async for ====')
    foo = Foo()
    async for x in foo:
        print('>', x)

    print('==== __anext__() ====')
    foo = Foo()
    a = foo.__anext__()
    b = foo.__anext__()
    c = foo.__anext__()
    print('>', await a)
    print('>', await b)
    print('>', await c)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks()))
loop.close()

我已经实现了 __anext__ 到 return 未来,而不仅仅是 async def,所以我可以更好地控制解决这些未来的中间步骤。

这是一个输出:

==== async for ====
__anext__: called when state=0
__anext__: left when state=0
later: called when state=0
later: left when state=1
> 1
__anext__: called when state=1
__anext__: left when state=1
later: called when state=1
later: left when state=2
> 2
__anext__: called when state=2
__anext__: left when state=2
later: called when state=2
later: left when state=3
==== __anext__() ====
__anext__: called when state=0
__anext__: left when state=0
__anext__: called when state=0
__anext__: left when state=0
__anext__: called when state=0
__anext__: left when state=0
later: called when state=0
later: left when state=1
later: called when state=1
later: left when state=2
later: called when state=2
later: left when state=3
> 1
> 2
~~~ dies with StopAsyncIteration ~~~

async for 的情况下,可以看到 __anext__ 首先完成,然后事件循环开始,运行 s 任何被安排的延迟。如果 __anext__ 正在堆叠,事件循环将借此机会安排另一个 __anext__ 调用,直到延迟的 later 开始——相反,事件循环阻塞直到 later 的时间至 运行.

因此,如果您的异步迭代器仅用于 async for,可以安全地假设同时只有一个 __anext__ 运行ning。

使用 __anext__ 更糟:您可以随意堆叠它们。但是,如果您的 __anext__ 是协程,那么这应该没什么大不了的——无论如何调用时它都不应该保持任何状态。或者至少我是这么认为的。

是的,这可以同时执行多个 __anext__ 任务。与任何其他生成器一样,每次调用 __anext__ 都会执行到第一个 yieldawaityield from)。

这是安全的还是不安全的取决于实现:

  1. 通过队列或其他同步原语强制执行顺序的实现将是安全的
  2. __anext__ 使用内部状态的实现应该是安全的,但结果的顺序将未定义
  3. 修改共享状态的实现将是不安全的

现在我们有了 asynchronous generators,我想知道当实现由异步生成器提供时,对于并发调用 __anext__() 这个问题的答案是什么。我尝试了以下测试程序:

import itertools
import trio

async def foo():
  for i in itertools.count():
    await trio.sleep(1)
    yield i

async def go(aiter):
  return (await aiter.__anext__())

async def amain():
  aiter = foo().__aiter__()
  async with trio.open_nursery() as nursery:
    nursery.start_soon(go, aiter)
    nursery.start_soon(go, aiter)

trio.run(amain)

这失败了:

RuntimeError: anext(): asynchronous generator is already running

据我所知,这来自 CPython 而不是 trio。因此,至少在 CPython 中,我相信并发调用从异步生成器自动创建的 __anext__() 会被检查并且不允许。大概是由调用者来确保这不会发生。

我一直没能找到一个明确的规范来说明行为必须是这样的,所以也许不应该依赖这个。确保调用者永远不会这样做似乎是可靠的选择。