如何使用超时迭代异步迭代器?

How to iterate over an asynchronous iterator with a timeout?

我觉得从代码的角度更容易理解:

try:
    async for item in timeout(something(), timeout=60):
        await do_something_useful(item)
except asyncio.futures.TimeoutError:
    await refresh()

我希望 async for 到 运行 最多 60 秒。

一种简单的方法是使用asyncio.Queue,并将代码分成两个协程:

queue = asyncio.Queue()
async for item in something():
    await queue.put(item)

在另一个协程中:

while True:
    try:
        item = await asyncio.wait_for(queue.get(), 60)
    except asyncio.TimeoutError:
        pass
    else:
        if item is None:
            break  # use None or whatever suits you to gracefully exit
        await do_something_useful(item)
    refresh()

请注意,如果处理程序 do_something_useful()something() 生成项目慢,这将使队列增长。您可以在队列上设置 maxsize 以限制缓冲区大小。

AsyncTimedIterable 可能是您代码中 timeout() 的实现:

class _AsyncTimedIterator:

    __slots__ = ('_iterator', '_timeout', '_sentinel')

    def __init__(self, iterable, timeout, sentinel):
        self._iterator = iterable.__aiter__()
        self._timeout = timeout
        self._sentinel = sentinel

    async def __anext__(self):
        try:
            return await asyncio.wait_for(self._iterator.__anext__(), self._timeout)
        except asyncio.TimeoutError:
            return self._sentinel


class AsyncTimedIterable:

    __slots__ = ('_factory', )

    def __init__(self, iterable, timeout=None, sentinel=None):
        self._factory = lambda: _AsyncTimedIterator(iterable, timeout, sentinel)

    def __aiter__(self):
        return self._factory()

(原答案)

或使用此 class 替换您的 timeout() 函数:

class AsyncTimedIterable:
    def __init__(self, iterable, timeout=None, sentinel=None):
        class AsyncTimedIterator:
            def __init__(self):
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    return await asyncio.wait_for(self._iterator.__anext__(),
                                                  timeout)
                except asyncio.TimeoutError:
                    return sentinel

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()

根据 refresh 函数的性质,您的问题的答案可能会有所不同。如果是很短的-运行ning函数,可以在协程中自由调用。但如果它是阻塞函数(由于网络或 CPU),它应该是 运行 in executor 以避免冻结异步事件循环。

下面的代码显示了第一种情况的示例,在执行程序中将其更改为 运行 refresh 并不难。

第二点要说明的是异步迭代器的本质。据我了解,如果发生超时,您正在使用它从 somethingNone 获取结果。

如果我正确理解逻辑,您的代码可以使用 async_timeout 上下文管理器编写得更清晰(类似于创建 asyncio 允许的非异步样式)并且根本不使用异步迭代器:

import asyncio
from async_timeout import timeout


async def main():
    while True:
        try:
            async with timeout(60):
                res = await something()
                await do_something_useful(item)
        except asyncio.TimeoutError:
            pass
        finally:
            refresh()

I want the coroutine to execute refresh at least every 60 seconds.

如果你需要每 60 秒执行一次 refresh 而不管 do_something_useful 发生了什么,你可以用一个单独的协程来安排它:

import time

async def my_loop():
    # ensure refresh() is invoked at least once in 60 seconds
    done = False
    async def repeat_refresh():
        last_run = time.time()
        while not done:
            await refresh()
            now = time.time()
            await asyncio.sleep(max(60 - (now - last_run), 0))
            last_run = now
    # start repeat_refresh "in the background"
    refresh_task = asyncio.get_event_loop().create_task(repeat_refresh())

    try:
        async for item in something():
            if item is not None:
                await do_something_useful(item)
            await refresh()
    finally:
        done = True

我需要做这样的事情来创建一个 websocket(也是一个异步迭代器),如果它在一段时间后没有收到消息就会超时。我确定了以下内容:

socket_iter = socket.__aiter__()
try:
    while True:
        message = await asyncio.wait_for(
            socket_iter.__anext__(),
            timeout=10
        )
except asyncio.futures.TimeoutError:
    # streaming is completed
    pass

你的问题缺少一些细节,但假设 something() 是一个异步迭代器或生成器,并且你希望 item 每次都是 sentinel something 没有在超时内产生了一个值,这里是 timeout():

的实现
import asyncio
from typing import *

T = TypeVar('T')

# async generator, needs python 3.6
async def timeout(it: AsyncIterator[T], timeo: float, sentinel: T) -> AsyncGenerator[T, None]:
    try:
        nxt = asyncio.ensure_future(it.__anext__())
        while True:
            try:
                yield await asyncio.wait_for(asyncio.shield(nxt), timeo)
                nxt = asyncio.ensure_future(it.__anext__())
            except asyncio.TimeoutError:
                yield sentinel
    except StopAsyncIteration:
        pass
    finally:
        nxt.cancel()  # in case we're getting cancelled our self

测试:

async def something():
    yield 1
    await asyncio.sleep(1.1)
    yield 2
    await asyncio.sleep(2.1)
    yield 3


async def test():
    expect = [1, None, 2, None, None, 3]
    async for item in timeout(something(), 1, None):
        print("Check", item)
        assert item == expect.pop(0)

asyncio.get_event_loop().run_until_complete(test())

wait_for()超时时,它将取消任务。因此,我们需要将it.__anext__()包裹在一个任务中,然后屏蔽它,才能恢复迭代器。