如何使用超时迭代异步迭代器?
How to iterate over an asynchronous iterator with a timeout?
我觉得从代码的角度更容易理解:
try:
async for item in timeout(something(), timeout=60):
await do_something_useful(item)
except asyncio.futures.TimeoutError:
await refresh()
我希望 async for
到 运行 最多 60 秒。
一种简单的方法是使用asyncio.Queue
,并将代码分成两个协程:
queue = asyncio.Queue()
async for item in something():
await queue.put(item)
在另一个协程中:
while True:
try:
item = await asyncio.wait_for(queue.get(), 60)
except asyncio.TimeoutError:
pass
else:
if item is None:
break # use None or whatever suits you to gracefully exit
await do_something_useful(item)
refresh()
请注意,如果处理程序 do_something_useful()
比 something()
生成项目慢,这将使队列增长。您可以在队列上设置 maxsize
以限制缓冲区大小。
AsyncTimedIterable
可能是您代码中 timeout()
的实现:
class _AsyncTimedIterator:
__slots__ = ('_iterator', '_timeout', '_sentinel')
def __init__(self, iterable, timeout, sentinel):
self._iterator = iterable.__aiter__()
self._timeout = timeout
self._sentinel = sentinel
async def __anext__(self):
try:
return await asyncio.wait_for(self._iterator.__anext__(), self._timeout)
except asyncio.TimeoutError:
return self._sentinel
class AsyncTimedIterable:
__slots__ = ('_factory', )
def __init__(self, iterable, timeout=None, sentinel=None):
self._factory = lambda: _AsyncTimedIterator(iterable, timeout, sentinel)
def __aiter__(self):
return self._factory()
(原答案)
或使用此 class 替换您的 timeout()
函数:
class AsyncTimedIterable:
def __init__(self, iterable, timeout=None, sentinel=None):
class AsyncTimedIterator:
def __init__(self):
self._iterator = iterable.__aiter__()
async def __anext__(self):
try:
return await asyncio.wait_for(self._iterator.__anext__(),
timeout)
except asyncio.TimeoutError:
return sentinel
self._factory = AsyncTimedIterator
def __aiter__(self):
return self._factory()
根据 refresh
函数的性质,您的问题的答案可能会有所不同。如果是很短的-运行ning函数,可以在协程中自由调用。但如果它是阻塞函数(由于网络或 CPU),它应该是 运行 in executor 以避免冻结异步事件循环。
下面的代码显示了第一种情况的示例,在执行程序中将其更改为 运行 refresh
并不难。
第二点要说明的是异步迭代器的本质。据我了解,如果发生超时,您正在使用它从 something
或 None
获取结果。
如果我正确理解逻辑,您的代码可以使用 async_timeout 上下文管理器编写得更清晰(类似于创建 asyncio
允许的非异步样式)并且根本不使用异步迭代器:
import asyncio
from async_timeout import timeout
async def main():
while True:
try:
async with timeout(60):
res = await something()
await do_something_useful(item)
except asyncio.TimeoutError:
pass
finally:
refresh()
I want the coroutine to execute refresh
at least every 60 seconds.
如果你需要每 60 秒执行一次 refresh
而不管 do_something_useful
发生了什么,你可以用一个单独的协程来安排它:
import time
async def my_loop():
# ensure refresh() is invoked at least once in 60 seconds
done = False
async def repeat_refresh():
last_run = time.time()
while not done:
await refresh()
now = time.time()
await asyncio.sleep(max(60 - (now - last_run), 0))
last_run = now
# start repeat_refresh "in the background"
refresh_task = asyncio.get_event_loop().create_task(repeat_refresh())
try:
async for item in something():
if item is not None:
await do_something_useful(item)
await refresh()
finally:
done = True
我需要做这样的事情来创建一个 websocket(也是一个异步迭代器),如果它在一段时间后没有收到消息就会超时。我确定了以下内容:
socket_iter = socket.__aiter__()
try:
while True:
message = await asyncio.wait_for(
socket_iter.__anext__(),
timeout=10
)
except asyncio.futures.TimeoutError:
# streaming is completed
pass
你的问题缺少一些细节,但假设 something()
是一个异步迭代器或生成器,并且你希望 item
每次都是 sentinel
something
没有在超时内产生了一个值,这里是 timeout()
:
的实现
import asyncio
from typing import *
T = TypeVar('T')
# async generator, needs python 3.6
async def timeout(it: AsyncIterator[T], timeo: float, sentinel: T) -> AsyncGenerator[T, None]:
try:
nxt = asyncio.ensure_future(it.__anext__())
while True:
try:
yield await asyncio.wait_for(asyncio.shield(nxt), timeo)
nxt = asyncio.ensure_future(it.__anext__())
except asyncio.TimeoutError:
yield sentinel
except StopAsyncIteration:
pass
finally:
nxt.cancel() # in case we're getting cancelled our self
测试:
async def something():
yield 1
await asyncio.sleep(1.1)
yield 2
await asyncio.sleep(2.1)
yield 3
async def test():
expect = [1, None, 2, None, None, 3]
async for item in timeout(something(), 1, None):
print("Check", item)
assert item == expect.pop(0)
asyncio.get_event_loop().run_until_complete(test())
当wait_for()
超时时,它将取消任务。因此,我们需要将it.__anext__()
包裹在一个任务中,然后屏蔽它,才能恢复迭代器。
我觉得从代码的角度更容易理解:
try:
async for item in timeout(something(), timeout=60):
await do_something_useful(item)
except asyncio.futures.TimeoutError:
await refresh()
我希望 async for
到 运行 最多 60 秒。
一种简单的方法是使用asyncio.Queue
,并将代码分成两个协程:
queue = asyncio.Queue()
async for item in something():
await queue.put(item)
在另一个协程中:
while True:
try:
item = await asyncio.wait_for(queue.get(), 60)
except asyncio.TimeoutError:
pass
else:
if item is None:
break # use None or whatever suits you to gracefully exit
await do_something_useful(item)
refresh()
请注意,如果处理程序 do_something_useful()
比 something()
生成项目慢,这将使队列增长。您可以在队列上设置 maxsize
以限制缓冲区大小。
AsyncTimedIterable
可能是您代码中 timeout()
的实现:
class _AsyncTimedIterator:
__slots__ = ('_iterator', '_timeout', '_sentinel')
def __init__(self, iterable, timeout, sentinel):
self._iterator = iterable.__aiter__()
self._timeout = timeout
self._sentinel = sentinel
async def __anext__(self):
try:
return await asyncio.wait_for(self._iterator.__anext__(), self._timeout)
except asyncio.TimeoutError:
return self._sentinel
class AsyncTimedIterable:
__slots__ = ('_factory', )
def __init__(self, iterable, timeout=None, sentinel=None):
self._factory = lambda: _AsyncTimedIterator(iterable, timeout, sentinel)
def __aiter__(self):
return self._factory()
(原答案)
或使用此 class 替换您的 timeout()
函数:
class AsyncTimedIterable:
def __init__(self, iterable, timeout=None, sentinel=None):
class AsyncTimedIterator:
def __init__(self):
self._iterator = iterable.__aiter__()
async def __anext__(self):
try:
return await asyncio.wait_for(self._iterator.__anext__(),
timeout)
except asyncio.TimeoutError:
return sentinel
self._factory = AsyncTimedIterator
def __aiter__(self):
return self._factory()
根据 refresh
函数的性质,您的问题的答案可能会有所不同。如果是很短的-运行ning函数,可以在协程中自由调用。但如果它是阻塞函数(由于网络或 CPU),它应该是 运行 in executor 以避免冻结异步事件循环。
下面的代码显示了第一种情况的示例,在执行程序中将其更改为 运行 refresh
并不难。
第二点要说明的是异步迭代器的本质。据我了解,如果发生超时,您正在使用它从 something
或 None
获取结果。
如果我正确理解逻辑,您的代码可以使用 async_timeout 上下文管理器编写得更清晰(类似于创建 asyncio
允许的非异步样式)并且根本不使用异步迭代器:
import asyncio
from async_timeout import timeout
async def main():
while True:
try:
async with timeout(60):
res = await something()
await do_something_useful(item)
except asyncio.TimeoutError:
pass
finally:
refresh()
I want the coroutine to execute
refresh
at least every 60 seconds.
如果你需要每 60 秒执行一次 refresh
而不管 do_something_useful
发生了什么,你可以用一个单独的协程来安排它:
import time
async def my_loop():
# ensure refresh() is invoked at least once in 60 seconds
done = False
async def repeat_refresh():
last_run = time.time()
while not done:
await refresh()
now = time.time()
await asyncio.sleep(max(60 - (now - last_run), 0))
last_run = now
# start repeat_refresh "in the background"
refresh_task = asyncio.get_event_loop().create_task(repeat_refresh())
try:
async for item in something():
if item is not None:
await do_something_useful(item)
await refresh()
finally:
done = True
我需要做这样的事情来创建一个 websocket(也是一个异步迭代器),如果它在一段时间后没有收到消息就会超时。我确定了以下内容:
socket_iter = socket.__aiter__()
try:
while True:
message = await asyncio.wait_for(
socket_iter.__anext__(),
timeout=10
)
except asyncio.futures.TimeoutError:
# streaming is completed
pass
你的问题缺少一些细节,但假设 something()
是一个异步迭代器或生成器,并且你希望 item
每次都是 sentinel
something
没有在超时内产生了一个值,这里是 timeout()
:
import asyncio
from typing import *
T = TypeVar('T')
# async generator, needs python 3.6
async def timeout(it: AsyncIterator[T], timeo: float, sentinel: T) -> AsyncGenerator[T, None]:
try:
nxt = asyncio.ensure_future(it.__anext__())
while True:
try:
yield await asyncio.wait_for(asyncio.shield(nxt), timeo)
nxt = asyncio.ensure_future(it.__anext__())
except asyncio.TimeoutError:
yield sentinel
except StopAsyncIteration:
pass
finally:
nxt.cancel() # in case we're getting cancelled our self
测试:
async def something():
yield 1
await asyncio.sleep(1.1)
yield 2
await asyncio.sleep(2.1)
yield 3
async def test():
expect = [1, None, 2, None, None, 3]
async for item in timeout(something(), 1, None):
print("Check", item)
assert item == expect.pop(0)
asyncio.get_event_loop().run_until_complete(test())
当wait_for()
超时时,它将取消任务。因此,我们需要将it.__anext__()
包裹在一个任务中,然后屏蔽它,才能恢复迭代器。