asyncio aiohttp 取消一个http请求轮询,return个结果

asyncio aiohttp cancel an http request polling, return a result

我正在使用此代码每 5 秒创建一个 http 请求。

async def do_request():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8000/') as resp:
            print(resp.status)
            return await resp.text()

没有找到内置的调度器,所以我写了这个函数(类似于javascript):

async def set_interval(fn, seconds):
    while True:
        await fn()
        await asyncio.sleep(seconds)

这就是我的使用方式:

asyncio.ensure_future(set_interval(do_request, 5))

代码运行良好,但我有两个要求:
1. set_interval 添加到事件循环后如何停止? (类似于 javascript clearInterval()
2. set_interval 是否有可能 return 它包装的函数的值?在此示例中,我将需要响应文本。将接受执行相同任务的其他模式。

正在取消作业

  1. How can I stop the set_interval after it is added to the event loop? (similar to javascript clearInterval())

一种选择是取消 returned 任务:

# create_task is like ensure_future, but guarantees to return a task
task = loop.create_task(set_interval(do_request, 5))
...
task.cancel()

这也将取消 set_interval 等待的任何未来。如果您不想这样,并且希望 fn() 在后台继续,请改用 await asyncio.shield(fn())

分配函数生成的值

  1. Is it possible that set_interval will return the value of the function that it is wrapping? In this example I will want the response text. Other pattern that will do the same task will be accepted.

由于 set_interval 处于无限循环中 运行,它不能 return 任何东西 - return 会终止循环。

通过异步迭代公开值

如果您需要函数中的值,一种选择是将 set_interval 重新设计为生成器。调用者将使用 async for 获取值,这是非常可读的,但也与 JavaScript 的 setInterval:

有很大不同
async def iter_interval(fn, seconds):
    while True:
        yield await fn()
        await asyncio.sleep(seconds)

用法示例:

async def x():
    print('x')
    return time.time()

async def main():
    async for obj in iter_interval(x, 1):
        print('got', obj)

# or asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())

通过未来广播值

另一种方法是循环的每次传递都将生成的值广播到其他协程可以等待的全局未来;类似于:

next_value = None

async def set_interval(fn, seconds):
    global next_value
    loop = asyncio.get_event_loop()
    while True:
        next_value = loop.create_task(fn())
        await next_value
        await asyncio.sleep(seconds)

像这样使用:

# def x() as above

async def main():
    asyncio.create_task(set_interval(x, 1))
    while True:
        await asyncio.sleep(0)
        obj = await next_value
        print('got', obj)

虽然上面的简单实现有一个主要问题:一旦提供了下一个值,next_value 不会立即被新的 Future 替换,而是在休眠之后。这意味着 main() 在一个紧密的循环中打印 "got " ,直到新的时间戳到达。这也意味着删除 await asyncio.sleep(0) 实际上会破坏它,因为循环中唯一的 await 永远不会暂停并且 set_interval 将不再有机会 运行.

这显然不是故意的。我们希望 main() 中的循环到 等待 下一个值,即使在获得初始值之后也是如此。这样做,set_interval一定要聪明一点:

next_value = None

async def set_interval(fn, seconds):
    global next_value
    loop = asyncio.get_event_loop()

    next_value = loop.create_task(fn())
    await next_value

    async def iteration_pass():
        await asyncio.sleep(seconds)
        return await fn()

    while True:
        next_value = loop.create_task(iteration_pass())
        await next_value

此版本确保 next_value 在等待前一个时立即分配。为此,它使用了一个助手 iteration_pass 协程,它作为一项方便的任务在 fn() 实际准备好 运行 之前放入 next_value。有了它,main() 看起来像这样:

async def main():
    asyncio.create_task(set_interval(x, 1))
    await asyncio.sleep(0)
    while True:
        obj = await next_value
        print('got', obj)

main() 不再是忙循环,并且预期输出正好是每秒一个时间戳。但是,我们仍然需要 initial asyncio.sleep(0) 因为当我们只调用 create_task(set_interval(...))next_value 根本不可用。这是因为使用 create_task 安排的任务仅在 return 进入事件循环后 运行。省略 sleep(0) 会导致错误 "objects of type NoneType cannot be awaited".

要解决此问题,可以将 set_interval 拆分为常规 def,后者安排初始循环迭代并立即初始化 next_value。该函数实例化并立即 return 一个协程对象来完成其余的工作。

next_value = None

def set_interval(fn, seconds):
    global next_value
    loop = asyncio.get_event_loop()

    next_value = loop.create_task(fn())

    async def iteration_pass():
        await asyncio.sleep(seconds)
        return await fn()

    async def interval_loop():
        global next_value
        while True:
            next_value = loop.create_task(iteration_pass())
            await next_value

    return interval_loop()

现在main()可以用显而易见的方式写成:

async def main():
    asyncio.create_task(set_interval(x, 1))
    while True:
        obj = await next_value
        print('got', obj)

与异步迭代相比,这种方法的优点是多个侦听器可以观察值。