async 和 await 关键字究竟是如何工作的?等待链的末端是什么?

How do the async and await keywords work, exactly? What's at the end of the await chain?

我有这个代码:

async def foo(x):
    yield x
    yield x + 1

async def intermediary(y):
    await foo(y)

def bar():
    c = intermediary(5)

我要在 bar 中输入什么才能得到 c 中的 5 和 6?

我问是因为 asyncio 库看起来很神奇。我想知道魔法究竟是如何运作的。

也许我想编写自己的函数来调用 readwrite 然后通知我写的一些顶级循环它们正在等待文件描述符变得可读或可写.

然后,也许我希望顶级循环能够在这些条件变为真时恢复我的读写功能(以及顶级循环和它们之间的整个中间链)。

我已经或多或少地使用 asyncio。我写了这个little demo program,它在延迟后计算平方,但启动了很多这些任务,每个任务在随机间隔后附加到列表中。写的有点笨拙,但是很管用

我想确切地知道该程序在幕后做了什么。为了做到这一点,我必须知道睡眠上的等待如何通知顶级事件循环它想要睡眠(并再次被调用)一点,以及调用之间所有中间堆栈帧的状态进入休眠状态,顶层事件循环被冻结,然后在延迟结束时重新激活。

有一个 example in the documentation,看起来几乎与您正在尝试做的一模一样。它包含一个睡眠调用(用于代替 IO),因此 asyncio 方面很有意义。

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

您是否尝试查看 asyncio.sleep 的来源?

@coroutine                                                                       
def sleep(delay, result=None, *, loop=None):                                     
    """Coroutine that completes after a given time (in seconds)."""              
    if delay == 0:                                                               
        yield                                                                    
        return result                                                            

    if loop is None:                                                             
        loop = events.get_event_loop()                                           
    future = loop.create_future()                                                
    h = future._loop.call_later(delay,                                           
                                futures._set_result_unless_cancelled,            
                                future, result)                                  
    try:                                                                         
        return (yield from future)                                               
    finally:                                                                     
        h.cancel()

基本上就是用loop.call_later设置一个future,然后等待future。不确定这是否完全回答了您的问题,但它可能会有所帮助。

通过上面提供的代码,包含 yieldasync def 创建一个 Asynchronous Generator:

async def foo(x):
    yield x
    yield x + 1

要使用其中的数据,请使用 async for:

async def intermediary(y):
    results = []
    async for x in foo(y):
        results.append(x)
    return results

要从一个简单的协程(例如来自常规函数的 intermediary)中获取结果,您将需要创建一个事件循环并使用 run_until_complete():

loop = asyncio.get_event_loop()
result = loop.run_until_complete(intermediary(5))
print(result)
loop.close()

所以,我更了解如何让我想做的事情发挥作用。我的代码应该是这样的:

import types

@types.coroutine
def foo(x):
    yield x
    yield x + 1

async def intermediary(y):
    await foo(y)

def bar():
    c = intermediary(5)
    try:
        while True:
            result = c.send(None)
            print(f"Got {result} from the coroutine.")
    except StopIteration as e:
        print(f"StopIteration exception: {e!r}")

基本的答案是这个端点可以是一个用types.coroutine修饰的普通生成器。有更多方法可以完成这项工作,我的代码的进一步修改演示了它们:

import types
from collections.abc import Awaitable

@types.coroutine
def foo(x):
    sent = yield x
    print(f"foo was sent {sent!r}.")
    sent = yield x + 1
    print(f"foo was sent {sent!r}.")
    return 'generator'

class MyAwaitable(Awaitable):
    def __init__(self, x):
        super().__init__()
        self.x_ = x
    def __await__(self):
        def gen(x):
            for i in range(x-1, x+2):
                sent = yield i
                print(f"MyAwaitable was sent {sent!r}.")
            return 'class'
        return iter(gen(self.x_))

async def intermediary(t, y):
    awaited = await t(y)
    print(f"Got {awaited!r} as value from await.")

def runco(chain_end):
    c = intermediary(chain_end, 5)
    try:
        sendval = None
        while True:
            result = c.send(sendval)
            print(f"Got {result} from the coroutine.")
            sendval = sendval + 1 if sendval is not None else 0
    except StopIteration as e:
        print(f"StopIteration exception: {e!r}")

如您所见,任何定义 returns 迭代器的 __await__ 方法也可以被 await 编辑。真正发生的是被 await 编辑的东西被迭代直到它停止,然后是 await returns。您这样做的原因是链末端的最后一件事可能会遇到某种阻塞情况。然后它可以通过 yielding 或从迭代器返回一个值(基本上与 yielding 相同)报告该条件(或要求设置回调或其他)。然后顶层循环可以继续进行任何其他事情 运行.

整个 await 调用链的本质是,当您返回并从迭代器中请求下一个值时(回调到被阻塞的函数,告诉它它可能没有被阻塞现在)整个调用堆栈被重新激活。整个链的存在是为了在调用被阻塞时保留调用堆栈的状态。基本上是一个自愿放弃控制而不是让调度程序从中夺取控制权的线程。

当我问这个问题时,我脑海中的 asyncio 如何在内部工作的愿景显然是所谓的 curio 的工作方式,并且基于端点例程 yielding 一些一种指示他们被什么阻止的指标,以及 运行 全部(在我的示例中为 runco)的顶级循环,然后将其放入某种通用条件池中以查找一旦被阻止的条件发生变化,它就可以恢复例程。在 asyncio 中,发生了更复杂的事情,它使用具有 __await__ 方法的对象(如我的示例中的 MyAwaitable)和某种回调机制来使其全部工作。

Brett Cannon 写了一篇关于 how generators evolved into coroutines 的非常好的文章。它将比我在 Whosebug 的回答中更详细。

我发现的一个有趣的花絮是,当你这样做时:

def foo(x):
    yield 11

bar = types.coroutine(foo)

foobar 都变成 'coroutines' 并且可以 await 编辑。装饰器所做的只是在 foo.__code__.co_flags 中稍微翻转一下。当然,这是一个实现细节,不应依赖。我认为这实际上是一个错误,我可能会报告它。