Python 3.5 async/await 带有真实代码示例
Python 3.5 async/await with real code example
我阅读了大量关于 Python 3.5 async/await 的文章和教程。我不得不说我很困惑,因为有些使用 get_event_loop() 和 run_until_complete(),有些使用 ensure_future(),有些使用 asyncio.wait(),并且一些使用 call_soon().
看起来我有很多选择,但我不知道它们是否完全相同,或者有使用循环的情况,也有使用 wait() 的情况。
但问题是所有示例都使用 asyncio.sleep()
作为真实缓慢操作的模拟,其中 returns 是一个可等待的对象。一旦我尝试将这一行换成一些真实的代码,整个事情就失败了。上面写的方法之间到底有什么区别,我应该如何 运行 一个还没有准备好 async/await 的第三方库。我确实使用 Quandl 服务来获取一些股票数据。
import asyncio
import quandl
async def slow_operation(n):
# await asyncio.sleep(1) # Works because it's await ready.
await quandl.Dataset(n) # Doesn't work because it's not await ready.
async def main():
await asyncio.wait([
slow_operation("SIX/US9884981013EUR4"),
slow_operation("SIX/US88160R1014EUR4"),
])
# You don't have to use any code for 50 requests/day.
quandl.ApiConfig.api_key = "MY_SECRET_CODE"
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
我希望你明白我的感受是多么的迷茫,以及我想要并行地拥有 运行ning 是多么简单的事情。
如果第三方库与async/await
不兼容,那么显然您无法轻松使用它。有两种情况:
假设库中的函数是异步的,它给你一个回调,例如
def fn(..., clb):
...
所以你可以这样做:
def on_result(...):
...
fn(..., on_result)
在这种情况下,您可以像这样将此类函数包装到 asyncio 协议中:
from asyncio import Future
def wrapper(...):
future = Future()
def my_clb(...):
future.set_result(xyz)
fn(..., my_clb)
return future
(例外情况下使用 future.set_exception(exc)
)
然后您可以简单地在某些 async
函数中使用 await
:
调用该包装器
value = await wrapper(...)
请注意 await
适用于任何 Future
对象。您不必将 wrapper
声明为 async
.
如果库中的函数是同步的,那么您可以 运行 它在一个单独的线程中(您可能会为此使用一些线程池)。整个代码可能如下所示:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
# Initialize 10 threads
THREAD_POOL = ThreadPoolExecutor(10)
def synchronous_handler(param1, ...):
# Do something synchronous
time.sleep(2)
return "foo"
# Somewhere else
async def main():
loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
]
await asyncio.wait(futures)
for future in futures:
print(future.result())
with THREAD_POOL:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
如果您出于某种原因不能使用线程,那么使用这样的库只会让整个异步代码变得毫无意义。
但是请注意,将同步库与异步一起使用可能不是一个好主意。你不会得到太多,但你会使代码复杂化很多。
您可以查看以下来自 here 的简单工作示例。顺便说一下,它 returns 一个值得一读的字符串 :-)
import aiohttp
import asyncio
async def fetch(client):
async with client.get('https://docs.aiohttp.org/en/stable/client_reference.html') as resp:
assert resp.status == 200
return await resp.text()
async def main():
async with aiohttp.ClientSession() as client:
html = await fetch(client)
print(html)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
我阅读了大量关于 Python 3.5 async/await 的文章和教程。我不得不说我很困惑,因为有些使用 get_event_loop() 和 run_until_complete(),有些使用 ensure_future(),有些使用 asyncio.wait(),并且一些使用 call_soon().
看起来我有很多选择,但我不知道它们是否完全相同,或者有使用循环的情况,也有使用 wait() 的情况。
但问题是所有示例都使用 asyncio.sleep()
作为真实缓慢操作的模拟,其中 returns 是一个可等待的对象。一旦我尝试将这一行换成一些真实的代码,整个事情就失败了。上面写的方法之间到底有什么区别,我应该如何 运行 一个还没有准备好 async/await 的第三方库。我确实使用 Quandl 服务来获取一些股票数据。
import asyncio
import quandl
async def slow_operation(n):
# await asyncio.sleep(1) # Works because it's await ready.
await quandl.Dataset(n) # Doesn't work because it's not await ready.
async def main():
await asyncio.wait([
slow_operation("SIX/US9884981013EUR4"),
slow_operation("SIX/US88160R1014EUR4"),
])
# You don't have to use any code for 50 requests/day.
quandl.ApiConfig.api_key = "MY_SECRET_CODE"
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
我希望你明白我的感受是多么的迷茫,以及我想要并行地拥有 运行ning 是多么简单的事情。
如果第三方库与async/await
不兼容,那么显然您无法轻松使用它。有两种情况:
假设库中的函数是异步的,它给你一个回调,例如
def fn(..., clb): ...
所以你可以这样做:
def on_result(...): ... fn(..., on_result)
在这种情况下,您可以像这样将此类函数包装到 asyncio 协议中:
from asyncio import Future def wrapper(...): future = Future() def my_clb(...): future.set_result(xyz) fn(..., my_clb) return future
(例外情况下使用
future.set_exception(exc)
)然后您可以简单地在某些
调用该包装器async
函数中使用await
:value = await wrapper(...)
请注意
await
适用于任何Future
对象。您不必将wrapper
声明为async
.如果库中的函数是同步的,那么您可以 运行 它在一个单独的线程中(您可能会为此使用一些线程池)。整个代码可能如下所示:
import asyncio import time from concurrent.futures import ThreadPoolExecutor # Initialize 10 threads THREAD_POOL = ThreadPoolExecutor(10) def synchronous_handler(param1, ...): # Do something synchronous time.sleep(2) return "foo" # Somewhere else async def main(): loop = asyncio.get_event_loop() futures = [ loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...), loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...), loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...), ] await asyncio.wait(futures) for future in futures: print(future.result()) with THREAD_POOL: loop = asyncio.get_event_loop() loop.run_until_complete(main())
如果您出于某种原因不能使用线程,那么使用这样的库只会让整个异步代码变得毫无意义。
但是请注意,将同步库与异步一起使用可能不是一个好主意。你不会得到太多,但你会使代码复杂化很多。
您可以查看以下来自 here 的简单工作示例。顺便说一下,它 returns 一个值得一读的字符串 :-)
import aiohttp
import asyncio
async def fetch(client):
async with client.get('https://docs.aiohttp.org/en/stable/client_reference.html') as resp:
assert resp.status == 200
return await resp.text()
async def main():
async with aiohttp.ClientSession() as client:
html = await fetch(client)
print(html)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())