如何在 Python 中创建异步生成器?
How to create an async generator in Python?
我正在尝试将此 Python2.7 代码重写为新的异步世界秩序:
def get_api_results(func, iterable):
pool = multiprocessing.Pool(5)
for res in pool.map(func, iterable):
yield res
map()
阻塞,直到计算完所有结果,所以我试图将其重写为异步实现,一旦结果准备好就会产生结果。与 map()
一样,return 值必须按与 iterable
相同的顺序 return 编辑。我试过这个(我需要 requests
因为遗留身份验证要求):
import requests
def get(i):
r = requests.get('https://example.com/api/items/%s' % i)
return i, r.json()
async def get_api_results():
loop = asyncio.get_event_loop()
futures = []
for n in range(1, 11):
futures.append(loop.run_in_executor(None, get, n))
async for f in futures:
k, v = await f
yield k, v
for r in get_api_results():
print(r)
但使用 Python 3.6 我得到:
File "scratch.py", line 16, in <module>
for r in get_api_results():
TypeError: 'async_generator' object is not iterable
我怎样才能做到这一点?
您将事件循环放在另一个协程中。不要那样做。事件循环是最外层的异步代码'driver',应该是运行同步的。
如果您需要处理获取的结果,请编写更多协程来执行此操作。他们可以从队列中获取数据,也可以直接驱动获取。
您可以有一个获取和处理结果的主函数,例如:
async def main(loop):
for n in range(1, 11):
future = loop.run_in_executor(None, get, n)
k, v = await future
# do something with the result
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
我也会使用像 aiohttp
这样的异步库使 get()
函数正确地异步,所以你根本不必使用执行程序。
关于您的旧 (2.7) 代码 - 多处理被认为是更简单的线程模块的强大替代品,用于并发处理 CPU 密集型任务,其中线程不能很好地工作。您的代码可能未 CPU 绑定 - 因为它只需要发出 HTTP 请求 - 线程可能足以解决您的问题。
然而,Python 3+ 没有直接使用 threading
,而是有一个很好的模块,叫做 concurrent.futures that with a cleaner API via cool Executor
classes. This module is available also for python 2.7 as an external package。
以下代码适用于 python 2 和 python 3:
# For python 2, first run:
#
# pip install futures
#
from __future__ import print_function
import requests
from concurrent import futures
URLS = [
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/3',
'http://httpbin.org/delay/6',
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.coooom/',
]
def fetch(url):
r = requests.get(url)
r.raise_for_status()
return r.content
def fetch_all(urls):
with futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(fetch, url): url for url in urls}
print("All URLs submitted.")
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
if future.exception() is None:
yield url, future.result()
else:
# print('%r generated an exception: %s' % (
# url, future.exception()))
yield url, None
for url, s in fetch_all(URLS):
status = "{:,.0f} bytes".format(len(s)) if s is not None else "Failed"
print('{}: {}'.format(url, status))
此代码使用 futures.ThreadPoolExecutor
,基于线程。 as_completed()
这里使用了很多魔法。
上面的 python 3.6 代码使用 run_in_executor()
创建了 futures.ProcessPoolExecutor()
,并没有真正使用异步 IO!
如果您真的想继续使用 asyncio,则需要使用支持 asyncio 的 HTTP 客户端,例如 aiohttp。这是一个示例代码:
import asyncio
import aiohttp
async def fetch(session, url):
print("Getting {}...".format(url))
async with session.get(url) as resp:
text = await resp.text()
return "{}: Got {} bytes".format(url, len(text))
async def fetch_all():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay))
for delay in (1, 1, 2, 3, 3)]
for task in asyncio.as_completed(tasks):
print(await task)
return "Done."
loop = asyncio.get_event_loop()
resp = loop.run_until_complete(fetch_all())
print(resp)
loop.close()
如您所见,asyncio
也有一个 as_completed()
,现在使用真正的异步 IO,在一个进程上只使用一个线程。
我正在尝试将此 Python2.7 代码重写为新的异步世界秩序:
def get_api_results(func, iterable):
pool = multiprocessing.Pool(5)
for res in pool.map(func, iterable):
yield res
map()
阻塞,直到计算完所有结果,所以我试图将其重写为异步实现,一旦结果准备好就会产生结果。与 map()
一样,return 值必须按与 iterable
相同的顺序 return 编辑。我试过这个(我需要 requests
因为遗留身份验证要求):
import requests
def get(i):
r = requests.get('https://example.com/api/items/%s' % i)
return i, r.json()
async def get_api_results():
loop = asyncio.get_event_loop()
futures = []
for n in range(1, 11):
futures.append(loop.run_in_executor(None, get, n))
async for f in futures:
k, v = await f
yield k, v
for r in get_api_results():
print(r)
但使用 Python 3.6 我得到:
File "scratch.py", line 16, in <module>
for r in get_api_results():
TypeError: 'async_generator' object is not iterable
我怎样才能做到这一点?
您将事件循环放在另一个协程中。不要那样做。事件循环是最外层的异步代码'driver',应该是运行同步的。
如果您需要处理获取的结果,请编写更多协程来执行此操作。他们可以从队列中获取数据,也可以直接驱动获取。
您可以有一个获取和处理结果的主函数,例如:
async def main(loop):
for n in range(1, 11):
future = loop.run_in_executor(None, get, n)
k, v = await future
# do something with the result
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
我也会使用像 aiohttp
这样的异步库使 get()
函数正确地异步,所以你根本不必使用执行程序。
关于您的旧 (2.7) 代码 - 多处理被认为是更简单的线程模块的强大替代品,用于并发处理 CPU 密集型任务,其中线程不能很好地工作。您的代码可能未 CPU 绑定 - 因为它只需要发出 HTTP 请求 - 线程可能足以解决您的问题。
然而,Python 3+ 没有直接使用 threading
,而是有一个很好的模块,叫做 concurrent.futures that with a cleaner API via cool Executor
classes. This module is available also for python 2.7 as an external package。
以下代码适用于 python 2 和 python 3:
# For python 2, first run:
#
# pip install futures
#
from __future__ import print_function
import requests
from concurrent import futures
URLS = [
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/3',
'http://httpbin.org/delay/6',
'http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.coooom/',
]
def fetch(url):
r = requests.get(url)
r.raise_for_status()
return r.content
def fetch_all(urls):
with futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(fetch, url): url for url in urls}
print("All URLs submitted.")
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
if future.exception() is None:
yield url, future.result()
else:
# print('%r generated an exception: %s' % (
# url, future.exception()))
yield url, None
for url, s in fetch_all(URLS):
status = "{:,.0f} bytes".format(len(s)) if s is not None else "Failed"
print('{}: {}'.format(url, status))
此代码使用 futures.ThreadPoolExecutor
,基于线程。 as_completed()
这里使用了很多魔法。
上面的 python 3.6 代码使用 run_in_executor()
创建了 futures.ProcessPoolExecutor()
,并没有真正使用异步 IO!
如果您真的想继续使用 asyncio,则需要使用支持 asyncio 的 HTTP 客户端,例如 aiohttp。这是一个示例代码:
import asyncio
import aiohttp
async def fetch(session, url):
print("Getting {}...".format(url))
async with session.get(url) as resp:
text = await resp.text()
return "{}: Got {} bytes".format(url, len(text))
async def fetch_all():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay))
for delay in (1, 1, 2, 3, 3)]
for task in asyncio.as_completed(tasks):
print(await task)
return "Done."
loop = asyncio.get_event_loop()
resp = loop.run_until_complete(fetch_all())
print(resp)
loop.close()
如您所见,asyncio
也有一个 as_completed()
,现在使用真正的异步 IO,在一个进程上只使用一个线程。