asyncio.as_completed 是如何工作的

How does asyncio.as_completed work

阅读,我运行跨越asyncio.tasks.as_completed。我不明白该功能实际上是如何工作的。它被记录为一个非异步例程,returns futures 按它们完成的顺序排列。 它创建一个与事件循环关联的队列,为每个未来添加一个完成回调,然后尝试从队列中获取与未来一样多的项目。

核心代码如下:

    def _on_completion(f):
        if not todo:
            return  # _on_timeout() was here first.
        todo.remove(f)
        done.put_nowait(f)
        if not todo and timeout_handle is not None:
            timeout_handle.cancel()

    @coroutine
    def _wait_for_one():
        f = yield from done.get()
        if f is None:
            # Dummy value from _on_timeout().
            raise futures.TimeoutError
        return f.result()  # May raise f.exception().

    for f in todo:
        f.add_done_callback(_on_completion)
    if todo and timeout is not None:
        timeout_handle = loop.call_later(timeout, _on_timeout)
    for _ in range(len(todo)):
        yield _wait_for_one()

我想了解这段代码的工作原理。我最大的问题是:

我无法将其与 _wait_for_one 中的 return f.result 行进行核对。记录的调用约定是否正确?如果是,那么收益从何而来?

您复制的代码缺少 header 部分,这很重要。

# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
    """Return an iterator whose values are coroutines.

    When waiting for the yielded coroutines you'll get the results (or
    exceptions!) of the original Futures (or coroutines), in the order
    in which and as soon as they complete.

    This differs from PEP 3148; the proper way to use this is:

        for f in as_completed(fs):
            result = yield from f  # The 'yield from' may raise.
            # Use result.

    If a timeout is specified, the 'yield from' will raise
    TimeoutError when the timeout occurs before all Futures are done.

    Note: The futures 'f' are not necessarily members of fs.
    """
    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
    loop = loop if loop is not None else events.get_event_loop()
    todo = {ensure_future(f, loop=loop) for f in set(fs)}
    from .queues import Queue  # Import here to avoid circular import problem.
    done = Queue(loop=loop)
    timeout_handle = None

    def _on_timeout():
        for f in todo:
            f.remove_done_callback(_on_completion)
            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
        todo.clear()  # Can't do todo.remove(f) in the loop.

    def _on_completion(f):
        if not todo:
            return  # _on_timeout() was here first.
        todo.remove(f)
        done.put_nowait(f)
        if not todo and timeout_handle is not None:
            timeout_handle.cancel()

    @coroutine
    def _wait_for_one():
        f = yield from done.get()
        if f is None:
            # Dummy value from _on_timeout().
            raise futures.TimeoutError
        return f.result()  # May raise f.exception().

    for f in todo:
        f.add_done_callback(_on_completion)
    if todo and timeout is not None:
        timeout_handle = loop.call_later(timeout, _on_timeout)
    for _ in range(len(todo)):
        yield _wait_for_one()

[循环实际上在哪里 运行?]

为了简单起见,假设超时设置为None。

as_completed 需要可迭代的 futures,而不是协程。所以这个期货已经绑定到循环并计划执行。换句话说,这些期货是 loop.create_task 或 asyncio.ensure_futures 的输出(没有明确写明)。 所以循环已经是 "running" 他们,当他们完成时,他们未来的 .done() 方法将 return True。

然后 "done" queue 被创建。请注意 "done" queue 是 asyncio.queue 的一个实例,即 queue 实现阻塞方法(.get,.put)»使用循环«。

通过行 "todo = { ...",每个协程的未来(即 fs 的一个元素)都包裹在另一个未来»绑定到循环«,最后一个未来的 done_callback 设置为调用_on_completion 函数。

_on_completion 函数,将在循环完成协程执行时调用,协程的未来在 "fs" 中传递给 as_completed 函数。

_on_completion 函数从 todo 集合中删除 "our future" 并将其结果(即未来在 "fs" 集合中的协程)放入完成的 queue. 换句话说,as_completed 函数所做的就是将这些 futures 与 done_callback 相关联,以便将原始 futures 的结果移至 done queue.

然后,对于 len(fs) == len(todo) 次,as_completed 函数生成一个阻塞 "yield from done.get()" 的协程,等待 _on_completed(或 _on_timeout) 函数将结果放入 done 中 done queue。

由 as_completed 调用者执行的 "yield from"s 将等待结果出现在 done queue.

[收益从何而来?]

这是因为 todo 是一个 asyncio.queue,所以你可以(asyncio-)阻塞直到 queue.

中的值是 .put()