创建在协程完成时产生协程结果的生成器
Create generator that yields coroutine results as the coroutines finish
目前,我有一个低效的同步生成器,它按顺序发出许多 HTTP 请求并产生结果。我想使用 asyncio
and aiohttp
to parallelise the requests and thereby speed up this generator, but I want to keep it as an ordinary generator (not a PEP 525 async generator) 这样调用它的非异步代码就不需要修改了。如何创建这样的生成器?
asyncio.as_completed()
按照输入期货完成的顺序采用协程或期货的可迭代和 returns 期货的可迭代。 通常,你会遍历它的结果和 await
来自 async
函数内部的成员...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
async def main():
for future in asyncio.as_completed([first(), second(), third()]):
print(await future)
# Prints 'second', then 'third', then 'first'
asyncio.run(main())
...但是为了这个问题的目的,我们想要的是能够从一个普通的生成器中产生这些结果,这样普通的同步代码就可以在不知道 async
函数的情况下使用它们正在引擎盖下使用。我们可以通过调用 loop.run_until_complete()
我们 as_completed
调用产生的期货来做到这一点...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
for future in asyncio.as_completed([first(), second(), third()]):
yield loop.run_until_complete(future)
# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
print(element)
通过这种方式,我们以不需要调用者将任何函数定义为 async
或者甚至不需要知道 [=21] 的方式将我们的异步代码公开给 non-async-land =] 在后台使用 asyncio
。
作为在某些情况下提供更大灵活性的 ordinary_generator()
的替代实现,我们可以使用 FIRST_COMPLETED
标志重复调用 asyncio.wait()
而不是循环 as_completed()
:
import concurrent.futures
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = [first(), second(), third()]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(
pending,
return_when=concurrent.futures.FIRST_COMPLETED
)
)
for job in done:
yield job.result()
这种维护 pending
作业列表的方法的优点是我们可以调整它以动态地将作业添加到 pending
列表中。这在我们的异步作业可以将不可预测数量的进一步作业添加到队列的用例中很有用 - 就像网络蜘蛛跟踪它访问的每个页面上的所有链接一样。
一个警告:上面的方法假设我们从主线程调用同步代码,在这种情况下 get_event_loop
保证给我们一个循环,我们不需要 .close
它。如果我们希望 ordinary_generator
可从 non-main 线程使用,尤其是之前可能创建了事件循环的线程,那么生活会变得更加艰难,因为我们不能依赖 get_event_loop
(它在任何还没有事件循环的 non-main 线程上引发 RuntimeError
)。在那种情况下,我能想到的最简单的事情就是将 new 线程分拆到 运行 我们的 asyncio
代码,并通过队列与其通信:
def ordinary_generator():
sentinel = object()
queue = Queue()
def thread_entry_point():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for future in asyncio.as_completed([first(), second(), third()]):
try:
queue.put(loop.run_until_complete(future))
except Exception as e:
queue.put((sentinel, e))
break
loop.close()
queue.put(sentinel)
Thread(target=thread_entry_point).start()
while True:
val = queue.get()
if val is sentinel:
return
if isinstance(val, tuple) and len(val) == 2 and val[0] is sentinel:
raise val[1]
yield val
(将倒数第二个示例中的 run_until_complete
的使用与最后一个示例中的额外线程的使用结合起来留作任何需要这样做的 reader 的练习。)
Mark 的回答很好,但我想贡献一个不依赖于低级事件循环方法的不同实现。
主要区别在于,不是执行 yield
,而是提供可用于处理结果的回调:
import asyncio
import random
async def do_stuff():
proc_time = round(random.random(), 2)
print('START: ', proc_time)
await asyncio.sleep(proc_time)
return proc_time
def concurrent_stuff(awaitables, callback):
# Must be async to wait
async def _as_completed():
for coro in asyncio.as_completed(awaitables):
result = await coro
callback(result) # Send result to callback.
# Perform the async calls inside a regular method
asyncio.run(_as_completed())
def when_done(result):
print('FINISHED: ', result)
def main():
awaitables = [do_stuff() for _ in range(5)]
concurrent_stuff(awaitables, when_done)
main()
# START: 0.56
# START: 0.98
# START: 0.39
# START: 0.23
# START: 0.94
# FINISHED: 0.23
# FINISHED: 0.39
# FINISHED: 0.56
# FINISHED: 0.94
# FINISHED: 0.98
目前,我有一个低效的同步生成器,它按顺序发出许多 HTTP 请求并产生结果。我想使用 asyncio
and aiohttp
to parallelise the requests and thereby speed up this generator, but I want to keep it as an ordinary generator (not a PEP 525 async generator) 这样调用它的非异步代码就不需要修改了。如何创建这样的生成器?
asyncio.as_completed()
按照输入期货完成的顺序采用协程或期货的可迭代和 returns 期货的可迭代。 通常,你会遍历它的结果和 await
来自 async
函数内部的成员...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
async def main():
for future in asyncio.as_completed([first(), second(), third()]):
print(await future)
# Prints 'second', then 'third', then 'first'
asyncio.run(main())
...但是为了这个问题的目的,我们想要的是能够从一个普通的生成器中产生这些结果,这样普通的同步代码就可以在不知道 async
函数的情况下使用它们正在引擎盖下使用。我们可以通过调用 loop.run_until_complete()
我们 as_completed
调用产生的期货来做到这一点...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
for future in asyncio.as_completed([first(), second(), third()]):
yield loop.run_until_complete(future)
# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
print(element)
通过这种方式,我们以不需要调用者将任何函数定义为 async
或者甚至不需要知道 [=21] 的方式将我们的异步代码公开给 non-async-land =] 在后台使用 asyncio
。
作为在某些情况下提供更大灵活性的 ordinary_generator()
的替代实现,我们可以使用 FIRST_COMPLETED
标志重复调用 asyncio.wait()
而不是循环 as_completed()
:
import concurrent.futures
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = [first(), second(), third()]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(
pending,
return_when=concurrent.futures.FIRST_COMPLETED
)
)
for job in done:
yield job.result()
这种维护 pending
作业列表的方法的优点是我们可以调整它以动态地将作业添加到 pending
列表中。这在我们的异步作业可以将不可预测数量的进一步作业添加到队列的用例中很有用 - 就像网络蜘蛛跟踪它访问的每个页面上的所有链接一样。
一个警告:上面的方法假设我们从主线程调用同步代码,在这种情况下 get_event_loop
保证给我们一个循环,我们不需要 .close
它。如果我们希望 ordinary_generator
可从 non-main 线程使用,尤其是之前可能创建了事件循环的线程,那么生活会变得更加艰难,因为我们不能依赖 get_event_loop
(它在任何还没有事件循环的 non-main 线程上引发 RuntimeError
)。在那种情况下,我能想到的最简单的事情就是将 new 线程分拆到 运行 我们的 asyncio
代码,并通过队列与其通信:
def ordinary_generator():
sentinel = object()
queue = Queue()
def thread_entry_point():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for future in asyncio.as_completed([first(), second(), third()]):
try:
queue.put(loop.run_until_complete(future))
except Exception as e:
queue.put((sentinel, e))
break
loop.close()
queue.put(sentinel)
Thread(target=thread_entry_point).start()
while True:
val = queue.get()
if val is sentinel:
return
if isinstance(val, tuple) and len(val) == 2 and val[0] is sentinel:
raise val[1]
yield val
(将倒数第二个示例中的 run_until_complete
的使用与最后一个示例中的额外线程的使用结合起来留作任何需要这样做的 reader 的练习。)
Mark 的回答很好,但我想贡献一个不依赖于低级事件循环方法的不同实现。
主要区别在于,不是执行 yield
,而是提供可用于处理结果的回调:
import asyncio
import random
async def do_stuff():
proc_time = round(random.random(), 2)
print('START: ', proc_time)
await asyncio.sleep(proc_time)
return proc_time
def concurrent_stuff(awaitables, callback):
# Must be async to wait
async def _as_completed():
for coro in asyncio.as_completed(awaitables):
result = await coro
callback(result) # Send result to callback.
# Perform the async calls inside a regular method
asyncio.run(_as_completed())
def when_done(result):
print('FINISHED: ', result)
def main():
awaitables = [do_stuff() for _ in range(5)]
concurrent_stuff(awaitables, when_done)
main()
# START: 0.56
# START: 0.98
# START: 0.39
# START: 0.23
# START: 0.94
# FINISHED: 0.23
# FINISHED: 0.39
# FINISHED: 0.56
# FINISHED: 0.94
# FINISHED: 0.98