`async for` 的语义 - __anext__ 调用可以重叠吗?
Semantics of `async for` - can __anext__ calls overlap?
如果 __anext__
将控制权交还给事件循环(通过 await
或 call_soon
/call_later
),是否有可能另一个 __anext__
在同一个实例上调用,而第一个实例尚未解决,否则它们将排队?有没有其他情况假设只有一个 __anext__
同时是 运行 是不安全的?
简答: 使用 async for
不会重叠,但调用 __anext__
会重叠。
长答案:
这是我在玩 __anext__
机制时所做的:
import asyncio
class Foo(object):
def __init__(self):
self.state = 0
def __aiter__(self):
return self
def __anext__(self):
def later():
try:
print(f'later: called when state={self.state}')
self.state += 1
if self.state == 3:
future.set_exception(StopAsyncIteration())
else:
future.set_result(self.state)
finally:
print(f'later: left when state={self.state}')
print(f'__anext__: called when state={self.state}')
try:
future = asyncio.Future()
loop.call_later(0.1, later)
return future
finally:
print(f'__anext__: left when state={self.state}')
async def main():
print('==== async for ====')
foo = Foo()
async for x in foo:
print('>', x)
print('==== __anext__() ====')
foo = Foo()
a = foo.__anext__()
b = foo.__anext__()
c = foo.__anext__()
print('>', await a)
print('>', await b)
print('>', await c)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks()))
loop.close()
我已经实现了 __anext__
到 return 未来,而不仅仅是 async def
,所以我可以更好地控制解决这些未来的中间步骤。
这是一个输出:
==== async for ====
__anext__: called when state=0
__anext__: left when state=0
later: called when state=0
later: left when state=1
> 1
__anext__: called when state=1
__anext__: left when state=1
later: called when state=1
later: left when state=2
> 2
__anext__: called when state=2
__anext__: left when state=2
later: called when state=2
later: left when state=3
==== __anext__() ====
__anext__: called when state=0
__anext__: left when state=0
__anext__: called when state=0
__anext__: left when state=0
__anext__: called when state=0
__anext__: left when state=0
later: called when state=0
later: left when state=1
later: called when state=1
later: left when state=2
later: called when state=2
later: left when state=3
> 1
> 2
~~~ dies with StopAsyncIteration ~~~
在 async for
的情况下,可以看到 __anext__
首先完成,然后事件循环开始,运行 s 任何被安排的延迟。如果 __anext__
正在堆叠,事件循环将借此机会安排另一个 __anext__
调用,直到延迟的 later
开始——相反,事件循环阻塞直到 later
的时间至 运行.
因此,如果您的异步迭代器仅用于 async for
,可以安全地假设同时只有一个 __anext__
运行ning。
使用 __anext__
更糟:您可以随意堆叠它们。但是,如果您的 __anext__
是协程,那么这应该没什么大不了的——无论如何调用时它都不应该保持任何状态。或者至少我是这么认为的。
是的,这可以同时执行多个 __anext__
任务。与任何其他生成器一样,每次调用 __anext__
都会执行到第一个 yield
(await
是 yield from
)。
这是安全的还是不安全的取决于实现:
- 通过队列或其他同步原语强制执行顺序的实现将是安全的
__anext__
使用内部状态的实现应该是安全的,但结果的顺序将未定义
- 修改共享状态的实现将是不安全的
现在我们有了 asynchronous generators,我想知道当实现由异步生成器提供时,对于并发调用 __anext__()
这个问题的答案是什么。我尝试了以下测试程序:
import itertools
import trio
async def foo():
for i in itertools.count():
await trio.sleep(1)
yield i
async def go(aiter):
return (await aiter.__anext__())
async def amain():
aiter = foo().__aiter__()
async with trio.open_nursery() as nursery:
nursery.start_soon(go, aiter)
nursery.start_soon(go, aiter)
trio.run(amain)
这失败了:
RuntimeError: anext(): asynchronous generator is already running
据我所知,这来自 CPython 而不是 trio。因此,至少在 CPython 中,我相信并发调用从异步生成器自动创建的 __anext__()
会被检查并且不允许。大概是由调用者来确保这不会发生。
我一直没能找到一个明确的规范来说明行为必须是这样的,所以也许不应该依赖这个。确保调用者永远不会这样做似乎是可靠的选择。
如果 __anext__
将控制权交还给事件循环(通过 await
或 call_soon
/call_later
),是否有可能另一个 __anext__
在同一个实例上调用,而第一个实例尚未解决,否则它们将排队?有没有其他情况假设只有一个 __anext__
同时是 运行 是不安全的?
简答: 使用 async for
不会重叠,但调用 __anext__
会重叠。
长答案:
这是我在玩 __anext__
机制时所做的:
import asyncio
class Foo(object):
def __init__(self):
self.state = 0
def __aiter__(self):
return self
def __anext__(self):
def later():
try:
print(f'later: called when state={self.state}')
self.state += 1
if self.state == 3:
future.set_exception(StopAsyncIteration())
else:
future.set_result(self.state)
finally:
print(f'later: left when state={self.state}')
print(f'__anext__: called when state={self.state}')
try:
future = asyncio.Future()
loop.call_later(0.1, later)
return future
finally:
print(f'__anext__: left when state={self.state}')
async def main():
print('==== async for ====')
foo = Foo()
async for x in foo:
print('>', x)
print('==== __anext__() ====')
foo = Foo()
a = foo.__anext__()
b = foo.__anext__()
c = foo.__anext__()
print('>', await a)
print('>', await b)
print('>', await c)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks()))
loop.close()
我已经实现了 __anext__
到 return 未来,而不仅仅是 async def
,所以我可以更好地控制解决这些未来的中间步骤。
这是一个输出:
==== async for ====
__anext__: called when state=0
__anext__: left when state=0
later: called when state=0
later: left when state=1
> 1
__anext__: called when state=1
__anext__: left when state=1
later: called when state=1
later: left when state=2
> 2
__anext__: called when state=2
__anext__: left when state=2
later: called when state=2
later: left when state=3
==== __anext__() ====
__anext__: called when state=0
__anext__: left when state=0
__anext__: called when state=0
__anext__: left when state=0
__anext__: called when state=0
__anext__: left when state=0
later: called when state=0
later: left when state=1
later: called when state=1
later: left when state=2
later: called when state=2
later: left when state=3
> 1
> 2
~~~ dies with StopAsyncIteration ~~~
在 async for
的情况下,可以看到 __anext__
首先完成,然后事件循环开始,运行 s 任何被安排的延迟。如果 __anext__
正在堆叠,事件循环将借此机会安排另一个 __anext__
调用,直到延迟的 later
开始——相反,事件循环阻塞直到 later
的时间至 运行.
因此,如果您的异步迭代器仅用于 async for
,可以安全地假设同时只有一个 __anext__
运行ning。
使用 __anext__
更糟:您可以随意堆叠它们。但是,如果您的 __anext__
是协程,那么这应该没什么大不了的——无论如何调用时它都不应该保持任何状态。或者至少我是这么认为的。
是的,这可以同时执行多个 __anext__
任务。与任何其他生成器一样,每次调用 __anext__
都会执行到第一个 yield
(await
是 yield from
)。
这是安全的还是不安全的取决于实现:
- 通过队列或其他同步原语强制执行顺序的实现将是安全的
__anext__
使用内部状态的实现应该是安全的,但结果的顺序将未定义- 修改共享状态的实现将是不安全的
现在我们有了 asynchronous generators,我想知道当实现由异步生成器提供时,对于并发调用 __anext__()
这个问题的答案是什么。我尝试了以下测试程序:
import itertools
import trio
async def foo():
for i in itertools.count():
await trio.sleep(1)
yield i
async def go(aiter):
return (await aiter.__anext__())
async def amain():
aiter = foo().__aiter__()
async with trio.open_nursery() as nursery:
nursery.start_soon(go, aiter)
nursery.start_soon(go, aiter)
trio.run(amain)
这失败了:
RuntimeError: anext(): asynchronous generator is already running
据我所知,这来自 CPython 而不是 trio。因此,至少在 CPython 中,我相信并发调用从异步生成器自动创建的 __anext__()
会被检查并且不允许。大概是由调用者来确保这不会发生。
我一直没能找到一个明确的规范来说明行为必须是这样的,所以也许不应该依赖这个。确保调用者永远不会这样做似乎是可靠的选择。