在什么情况下 ensure_future 不能真正开始未来?
In what condition could ensure_future not actually start the future?
我正在尝试在 Python 中使用子进程中的 wget 异步下载文件。我的代码如下所示:
async def download(url, filename):
wget = await asyncio.create_subprocess_exec(
'wget', url,
'O', filename
)
await wget.wait()
def main(url):
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(download(url, 'test.zip'), loop=loop)
print("Downloading..")
time.sleep(15)
print("Still downloading...")
loop.run_until_complete(future)
loop.close()
我想做的是见证 "Downloading.." 的打印,然后 15 秒后 "Still downloading...",同时文件下载已经开始。我实际看到的是文件的下载仅在代码命中 loop.run_until_complete(future)
时才开始
我的理解是 asyncio.ensure_future 应该启动 运行 协程的代码,但显然我遗漏了一些东西。
当传递协程时,asyncio.ensure_future
将其转换为任务 - 一种知道如何驱动协程的特殊未来 - 并将其放入事件循环中。 "Enqueue" 表示协程内的代码将由调度协程的 运行ning 事件循环执行。如果事件循环不是 运行ning,那么协程的 none 也将有机会 运行。通过调用 loop.run_forever()
或 loop.run_until_complete(some_future)
,循环被告知 运行。在问题中,事件循环仅在 在 调用 time.sleep()
之后才开始,因此下载开始延迟 15 秒。
time.sleep
应该 永远不会 在 运行 是 asyncio
事件循环的线程中调用。正确的休眠方式是 asyncio.sleep
,它在等待时将控制权交给事件循环。 asyncio.sleep
returns 可以提交给事件循环或从协程等待的未来:
# ... definition of download omitted ...
async def report():
print("Downloading..")
await asyncio.sleep(15)
print("Still downloading...")
def main(url):
loop = asyncio.get_event_loop()
dltask = loop.create_task(download(url, 'test.zip'))
loop.create_task(report())
loop.run_until_complete(dltask)
loop.close()
上面的代码有一个不同的问题。当下载 比 15 秒短 时,会导致打印 Task was destroyed but it is pending!
警告。问题是 report
任务在下载任务完成并且循环关闭时从未被取消,它只是被放弃了。发生这种情况通常表示存在错误或对 asyncio
的工作方式有误解,因此 asyncio
用警告标记它。
消除警告的明显方法是显式取消 report
协程的任务,但生成的代码最终变得冗长且不是很优雅。一个更简单和更短的修复是更改 report
等待下载任务,指定显示 "Still downloading..." 消息的超时:
async def dl_and_report(dltask):
print("Downloading..")
try:
await asyncio.wait_for(asyncio.shield(dltask), 15)
except asyncio.TimeoutError:
print("Still downloading...")
# assuming we want the download to continue; otherwise
# remove the shield(), and dltask will be canceled
await dltask
def main(url):
loop = asyncio.get_event_loop()
dltask = loop.create_task(download(url, 'test.zip'))
loop.run_until_complete(dl_and_report(dltask))
loop.close()
我正在尝试在 Python 中使用子进程中的 wget 异步下载文件。我的代码如下所示:
async def download(url, filename):
wget = await asyncio.create_subprocess_exec(
'wget', url,
'O', filename
)
await wget.wait()
def main(url):
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(download(url, 'test.zip'), loop=loop)
print("Downloading..")
time.sleep(15)
print("Still downloading...")
loop.run_until_complete(future)
loop.close()
我想做的是见证 "Downloading.." 的打印,然后 15 秒后 "Still downloading...",同时文件下载已经开始。我实际看到的是文件的下载仅在代码命中 loop.run_until_complete(future)
时才开始我的理解是 asyncio.ensure_future 应该启动 运行 协程的代码,但显然我遗漏了一些东西。
当传递协程时,asyncio.ensure_future
将其转换为任务 - 一种知道如何驱动协程的特殊未来 - 并将其放入事件循环中。 "Enqueue" 表示协程内的代码将由调度协程的 运行ning 事件循环执行。如果事件循环不是 运行ning,那么协程的 none 也将有机会 运行。通过调用 loop.run_forever()
或 loop.run_until_complete(some_future)
,循环被告知 运行。在问题中,事件循环仅在 在 调用 time.sleep()
之后才开始,因此下载开始延迟 15 秒。
time.sleep
应该 永远不会 在 运行 是 asyncio
事件循环的线程中调用。正确的休眠方式是 asyncio.sleep
,它在等待时将控制权交给事件循环。 asyncio.sleep
returns 可以提交给事件循环或从协程等待的未来:
# ... definition of download omitted ...
async def report():
print("Downloading..")
await asyncio.sleep(15)
print("Still downloading...")
def main(url):
loop = asyncio.get_event_loop()
dltask = loop.create_task(download(url, 'test.zip'))
loop.create_task(report())
loop.run_until_complete(dltask)
loop.close()
上面的代码有一个不同的问题。当下载 比 15 秒短 时,会导致打印 Task was destroyed but it is pending!
警告。问题是 report
任务在下载任务完成并且循环关闭时从未被取消,它只是被放弃了。发生这种情况通常表示存在错误或对 asyncio
的工作方式有误解,因此 asyncio
用警告标记它。
消除警告的明显方法是显式取消 report
协程的任务,但生成的代码最终变得冗长且不是很优雅。一个更简单和更短的修复是更改 report
等待下载任务,指定显示 "Still downloading..." 消息的超时:
async def dl_and_report(dltask):
print("Downloading..")
try:
await asyncio.wait_for(asyncio.shield(dltask), 15)
except asyncio.TimeoutError:
print("Still downloading...")
# assuming we want the download to continue; otherwise
# remove the shield(), and dltask will be canceled
await dltask
def main(url):
loop = asyncio.get_event_loop()
dltask = loop.create_task(download(url, 'test.zip'))
loop.run_until_complete(dl_and_report(dltask))
loop.close()