在什么情况下 ensure_future 不能真正开始未来?

In what condition could ensure_future not actually start the future?

我正在尝试在 Python 中使用子进程中的 wget 异步下载文件。我的代码如下所示:

async def download(url, filename):
    wget = await asyncio.create_subprocess_exec(
        'wget', url,
        'O', filename
    )
    await wget.wait()


def main(url):
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(download(url, 'test.zip'), loop=loop)
    print("Downloading..")
    time.sleep(15)
    print("Still downloading...")
    loop.run_until_complete(future)
    loop.close()

我想做的是见证 "Downloading.." 的打印,然后 15 秒后 "Still downloading...",同时文件下载已经开始。我实际看到的是文件的下载仅在代码命中 loop.run_until_complete(future)

时才开始

我的理解是 asyncio.ensure_future 应该启动 运行 协程的代码,但显然我遗漏了一些东西。

当传递协程时,asyncio.ensure_future 将其转换为任务 - 一种知道如何驱动协程的特殊未来 - 并将其放入事件循环中。 "Enqueue" 表示协程内的代码将由调度协程的 运行ning 事件循环执行。如果事件循环不是 运行ning,那么协程的 none 也将有机会 运行。通过调用 loop.run_forever()loop.run_until_complete(some_future),循环被告知 运行。在问题中,事件循环仅在 调用 time.sleep() 之后才开始,因此下载开始延迟 15 秒。

time.sleep 应该 永远不会 在 运行 是 asyncio 事件循环的线程中调用。正确的休眠方式是 asyncio.sleep,它在等待时将控制权交给事件循环。 asyncio.sleep returns 可以提交给事件循环或从协程等待的未来:

# ... definition of download omitted ...

async def report():
    print("Downloading..")
    await asyncio.sleep(15)
    print("Still downloading...")

def main(url):
    loop = asyncio.get_event_loop()
    dltask = loop.create_task(download(url, 'test.zip'))
    loop.create_task(report())
    loop.run_until_complete(dltask)
    loop.close()

上面的代码有一个不同的问题。当下载 比 15 秒短 时,会导致打印 Task was destroyed but it is pending! 警告。问题是 report 任务在下载任务完成并且循环关闭时从未被取消,它只是被放弃了。发生这种情况通常表示存在错误或对 asyncio 的工作方式有误解,因此 asyncio 用警告标记它。

消除警告的明显方法是显式取消 report 协程的任务,但生成的代码最终变得冗长且不是很优雅。一个更简单和更短的修复是更改 report 等待下载任务,指定显示 "Still downloading..." 消息的超时:

async def dl_and_report(dltask):
    print("Downloading..")
    try:
        await asyncio.wait_for(asyncio.shield(dltask), 15)
    except asyncio.TimeoutError:
        print("Still downloading...")
        # assuming we want the download to continue; otherwise
        # remove the shield(), and dltask will be canceled
        await dltask

def main(url):
    loop = asyncio.get_event_loop()
    dltask = loop.create_task(download(url, 'test.zip'))
    loop.run_until_complete(dl_and_report(dltask))
    loop.close()