Asyncio.gather 对比 asyncio.wait
Asyncio.gather vs asyncio.wait
asyncio.gather
and asyncio.wait
似乎有类似的用途:我有一堆我想要 execute/wait 的异步东西(不一定要在下一个开始之前等待一个完成)。他们使用不同的语法,并且在一些细节上有所不同,但对我来说,拥有两个在功能上有如此巨大重叠的函数似乎非常不符合 pythonic。我错过了什么?
虽然在一般情况下类似(“运行 并获得许多任务的结果”),但每个函数在其他情况下都有一些特定功能:
asyncio.gather()
Returns 一个 Future 实例,允许对任务进行高级分组:
import asyncio
from pprint import pprint
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(1, 3))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])
all_groups = asyncio.gather(group1, group2, group3)
results = loop.run_until_complete(all_groups)
loop.close()
pprint(results)
组中的所有任务都可以通过调用 group2.cancel()
甚至 all_groups.cancel()
来取消。另见 .gather(..., return_exceptions=True)
、
asyncio.wait()
支持在第一个任务完成后或在指定超时后等待停止,允许较低级别的操作精度:
import asyncio
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(0.5, 5))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1, 11)]
print("Get first result:")
finished, unfinished = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
for task in finished:
print(task.result())
print("unfinished:", len(unfinished))
print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
asyncio.wait(unfinished, timeout=2))
for task in finished2:
print(task.result())
print("unfinished2:", len(unfinished2))
print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))
for task in finished3:
print(task.result())
loop.close()
asyncio.wait
比 asyncio.gather
.
级别低
顾名思义,asyncio.gather
主要专注于收集结果。它等待一堆期货和 returns 它们按给定顺序的结果。
asyncio.wait
就等期货吧。它不是直接给你结果,而是给你完成和待处理的任务。您必须手动收集值。
此外,您可以指定等待所有期货完成或只等待第一个 wait
。
我还注意到您可以通过简单地指定列表来在 wait() 中提供一组协程:
result=loop.run_until_complete(asyncio.wait([
say('first hello', 2),
say('second hello', 1),
say('third hello', 4)
]))
而 gather() 中的分组是通过指定多个协同程序完成的:
result=loop.run_until_complete(asyncio.gather(
say('first hello', 2),
say('second hello', 1),
say('third hello', 4)
))
一个非常重要但很容易被忽略的区别是这两个函数在处理异常时的默认行为。
我将使用这个例子来模拟一个有时会引发异常的协程 -
import asyncio
import random
async def a_flaky_tsk(i):
await asyncio.sleep(i) # bit of fuzz to simulate a real-world example
if i % 2 == 0:
print(i, "ok")
else:
print(i, "crashed!")
raise ValueError
coros = [a_flaky_tsk(i) for i in range(10)]
await asyncio.gather(*coros)
输出 -
0 ok
1 crashed!
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>
asyncio.run(main())
File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main
await asyncio.gather(*coros)
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
如您所见,索引 1
之后的 coros 从未执行过。
但是await asyncio.wait(coros)
继续执行任务,即使其中一些任务失败了 -
0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
当然,可以使用 -
为两者更改此行为
asyncio.gather(..., return_exceptions=True)
或者,
asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)
但这并没有结束!
注意事项:
Task exception was never retrieved
在上面的日志中。
在您单独 await
之前,asyncio.wait()
不会 re-raise 子任务的异常。 (日志中的堆栈跟踪只是消息,无法捕获!)
done, pending = await asyncio.wait(coros)
for tsk in done:
try:
await tsk
except Exception as e:
print("I caught:", repr(e))
输出-
0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
另一方面,要使用 asyncio.gather()
捕获异常,您必须 -
results = await asyncio.gather(*coros, return_exceptions=True)
for result_or_exc in results:
if isinstance(result_or_exc, Exception):
print("I caught:", repr(result_or_exc))
(与之前相同的输出)
除了之前的所有答案之外,我还想谈谈 gather()
和 wait()
在 被取消 的情况下的不同行为。
Gather取消
如果 gather()
被取消,所有提交的等待(尚未完成)也被取消。
Wait取消
如果 wait()
ing 任务被取消,它只是抛出一个 CancelledError
并且等待的任务保持不变。
简单示例:
import asyncio
async def task(arg):
await asyncio.sleep(5)
return arg
async def cancel_waiting_task(work_task, waiting_task):
await asyncio.sleep(2)
waiting_task.cancel()
try:
await waiting_task
print("Waiting done")
except asyncio.CancelledError:
print("Waiting task cancelled")
try:
res = await work_task
print(f"Work result: {res}")
except asyncio.CancelledError:
print("Work task cancelled")
async def main():
work_task = asyncio.create_task(task("done"))
waiting = asyncio.create_task(asyncio.wait({work_task}))
await cancel_waiting_task(work_task, waiting)
work_task = asyncio.create_task(task("done"))
waiting = asyncio.gather(work_task)
await cancel_waiting_task(work_task, waiting)
asyncio.run(main())
输出:
asyncio.wait()
Waiting task cancelled
Work result: done
----------------
asyncio.gather()
Waiting task cancelled
Work task cancelled
有时需要结合 wait()
和 gather()
功能。例如,我们要等待至少一个任务完成,然后取消其余未决任务,如果waiting
本身被 取消了 ,然后 也取消了所有挂起的 任务。
作为真实示例,假设我们有一个断开连接事件和一个工作任务。而我们想要等待工作任务的结果,但是如果连接丢失了,那么就取消它。或者我们会同时发出多个请求,但在至少完成一个响应后,取消所有其他请求。
可以这样做:
import asyncio
from typing import Optional, Tuple, Set
async def wait_any(
tasks: Set[asyncio.Future], *, timeout: Optional[int] = None,
) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
tasks_to_cancel: Set[asyncio.Future] = set()
try:
done, tasks_to_cancel = await asyncio.wait(
tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
return done, tasks_to_cancel
except asyncio.CancelledError:
tasks_to_cancel = tasks
raise
finally:
for task in tasks_to_cancel:
task.cancel()
async def task():
await asyncio.sleep(5)
async def cancel_waiting_task(work_task, waiting_task):
await asyncio.sleep(2)
waiting_task.cancel()
try:
await waiting_task
print("Waiting done")
except asyncio.CancelledError:
print("Waiting task cancelled")
try:
res = await work_task
print(f"Work result: {res}")
except asyncio.CancelledError:
print("Work task cancelled")
async def check_tasks(waiting_task, working_task, waiting_conn_lost_task):
try:
await waiting_task
print("waiting is done")
except asyncio.CancelledError:
print("waiting is cancelled")
try:
await waiting_conn_lost_task
print("connection is lost")
except asyncio.CancelledError:
print("waiting connection lost is cancelled")
try:
await working_task
print("work is done")
except asyncio.CancelledError:
print("work is cancelled")
async def work_done_case():
working_task = asyncio.create_task(task())
connection_lost_event = asyncio.Event()
waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
await check_tasks(waiting_task, working_task, waiting_conn_lost_task)
async def conn_lost_case():
working_task = asyncio.create_task(task())
connection_lost_event = asyncio.Event()
waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
await asyncio.sleep(2)
connection_lost_event.set() # <---
await check_tasks(waiting_task, working_task, waiting_conn_lost_task)
async def cancel_waiting_case():
working_task = asyncio.create_task(task())
connection_lost_event = asyncio.Event()
waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
await asyncio.sleep(2)
waiting_task.cancel() # <---
await check_tasks(waiting_task, working_task, waiting_conn_lost_task)
async def main():
print("Work done")
print("-------------------")
await work_done_case()
print("\nConnection lost")
print("-------------------")
await conn_lost_case()
print("\nCancel waiting")
print("-------------------")
await cancel_waiting_case()
asyncio.run(main())
输出:
Work done
-------------------
waiting is done
waiting connection lost is cancelled
work is done
Connection lost
-------------------
waiting is done
connection is lost
work is cancelled
Cancel waiting
-------------------
waiting is cancelled
waiting connection lost is cancelled
work is cancelled
asyncio.gather
and asyncio.wait
似乎有类似的用途:我有一堆我想要 execute/wait 的异步东西(不一定要在下一个开始之前等待一个完成)。他们使用不同的语法,并且在一些细节上有所不同,但对我来说,拥有两个在功能上有如此巨大重叠的函数似乎非常不符合 pythonic。我错过了什么?
虽然在一般情况下类似(“运行 并获得许多任务的结果”),但每个函数在其他情况下都有一些特定功能:
asyncio.gather()
Returns 一个 Future 实例,允许对任务进行高级分组:
import asyncio
from pprint import pprint
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(1, 3))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])
all_groups = asyncio.gather(group1, group2, group3)
results = loop.run_until_complete(all_groups)
loop.close()
pprint(results)
组中的所有任务都可以通过调用 group2.cancel()
甚至 all_groups.cancel()
来取消。另见 .gather(..., return_exceptions=True)
、
asyncio.wait()
支持在第一个任务完成后或在指定超时后等待停止,允许较低级别的操作精度:
import asyncio
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(0.5, 5))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1, 11)]
print("Get first result:")
finished, unfinished = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
for task in finished:
print(task.result())
print("unfinished:", len(unfinished))
print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
asyncio.wait(unfinished, timeout=2))
for task in finished2:
print(task.result())
print("unfinished2:", len(unfinished2))
print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))
for task in finished3:
print(task.result())
loop.close()
asyncio.wait
比 asyncio.gather
.
顾名思义,asyncio.gather
主要专注于收集结果。它等待一堆期货和 returns 它们按给定顺序的结果。
asyncio.wait
就等期货吧。它不是直接给你结果,而是给你完成和待处理的任务。您必须手动收集值。
此外,您可以指定等待所有期货完成或只等待第一个 wait
。
我还注意到您可以通过简单地指定列表来在 wait() 中提供一组协程:
result=loop.run_until_complete(asyncio.wait([
say('first hello', 2),
say('second hello', 1),
say('third hello', 4)
]))
而 gather() 中的分组是通过指定多个协同程序完成的:
result=loop.run_until_complete(asyncio.gather(
say('first hello', 2),
say('second hello', 1),
say('third hello', 4)
))
一个非常重要但很容易被忽略的区别是这两个函数在处理异常时的默认行为。
我将使用这个例子来模拟一个有时会引发异常的协程 -
import asyncio
import random
async def a_flaky_tsk(i):
await asyncio.sleep(i) # bit of fuzz to simulate a real-world example
if i % 2 == 0:
print(i, "ok")
else:
print(i, "crashed!")
raise ValueError
coros = [a_flaky_tsk(i) for i in range(10)]
await asyncio.gather(*coros)
输出 -
0 ok
1 crashed!
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>
asyncio.run(main())
File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main
await asyncio.gather(*coros)
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
如您所见,索引 1
之后的 coros 从未执行过。
但是await asyncio.wait(coros)
继续执行任务,即使其中一些任务失败了 -
0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
当然,可以使用 -
为两者更改此行为asyncio.gather(..., return_exceptions=True)
或者,
asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)
但这并没有结束!
注意事项:
Task exception was never retrieved
在上面的日志中。
await
之前,asyncio.wait()
不会 re-raise 子任务的异常。 (日志中的堆栈跟踪只是消息,无法捕获!)
done, pending = await asyncio.wait(coros)
for tsk in done:
try:
await tsk
except Exception as e:
print("I caught:", repr(e))
输出-
0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
另一方面,要使用 asyncio.gather()
捕获异常,您必须 -
results = await asyncio.gather(*coros, return_exceptions=True)
for result_or_exc in results:
if isinstance(result_or_exc, Exception):
print("I caught:", repr(result_or_exc))
(与之前相同的输出)
除了之前的所有答案之外,我还想谈谈 gather()
和 wait()
在 被取消 的情况下的不同行为。
Gather取消
如果 gather()
被取消,所有提交的等待(尚未完成)也被取消。
Wait取消
如果 wait()
ing 任务被取消,它只是抛出一个 CancelledError
并且等待的任务保持不变。
简单示例:
import asyncio
async def task(arg):
await asyncio.sleep(5)
return arg
async def cancel_waiting_task(work_task, waiting_task):
await asyncio.sleep(2)
waiting_task.cancel()
try:
await waiting_task
print("Waiting done")
except asyncio.CancelledError:
print("Waiting task cancelled")
try:
res = await work_task
print(f"Work result: {res}")
except asyncio.CancelledError:
print("Work task cancelled")
async def main():
work_task = asyncio.create_task(task("done"))
waiting = asyncio.create_task(asyncio.wait({work_task}))
await cancel_waiting_task(work_task, waiting)
work_task = asyncio.create_task(task("done"))
waiting = asyncio.gather(work_task)
await cancel_waiting_task(work_task, waiting)
asyncio.run(main())
输出:
asyncio.wait()
Waiting task cancelled
Work result: done
----------------
asyncio.gather()
Waiting task cancelled
Work task cancelled
有时需要结合 wait()
和 gather()
功能。例如,我们要等待至少一个任务完成,然后取消其余未决任务,如果waiting
本身被 取消了 ,然后 也取消了所有挂起的 任务。
作为真实示例,假设我们有一个断开连接事件和一个工作任务。而我们想要等待工作任务的结果,但是如果连接丢失了,那么就取消它。或者我们会同时发出多个请求,但在至少完成一个响应后,取消所有其他请求。
可以这样做:
import asyncio
from typing import Optional, Tuple, Set
async def wait_any(
tasks: Set[asyncio.Future], *, timeout: Optional[int] = None,
) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
tasks_to_cancel: Set[asyncio.Future] = set()
try:
done, tasks_to_cancel = await asyncio.wait(
tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
return done, tasks_to_cancel
except asyncio.CancelledError:
tasks_to_cancel = tasks
raise
finally:
for task in tasks_to_cancel:
task.cancel()
async def task():
await asyncio.sleep(5)
async def cancel_waiting_task(work_task, waiting_task):
await asyncio.sleep(2)
waiting_task.cancel()
try:
await waiting_task
print("Waiting done")
except asyncio.CancelledError:
print("Waiting task cancelled")
try:
res = await work_task
print(f"Work result: {res}")
except asyncio.CancelledError:
print("Work task cancelled")
async def check_tasks(waiting_task, working_task, waiting_conn_lost_task):
try:
await waiting_task
print("waiting is done")
except asyncio.CancelledError:
print("waiting is cancelled")
try:
await waiting_conn_lost_task
print("connection is lost")
except asyncio.CancelledError:
print("waiting connection lost is cancelled")
try:
await working_task
print("work is done")
except asyncio.CancelledError:
print("work is cancelled")
async def work_done_case():
working_task = asyncio.create_task(task())
connection_lost_event = asyncio.Event()
waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
await check_tasks(waiting_task, working_task, waiting_conn_lost_task)
async def conn_lost_case():
working_task = asyncio.create_task(task())
connection_lost_event = asyncio.Event()
waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
await asyncio.sleep(2)
connection_lost_event.set() # <---
await check_tasks(waiting_task, working_task, waiting_conn_lost_task)
async def cancel_waiting_case():
working_task = asyncio.create_task(task())
connection_lost_event = asyncio.Event()
waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
await asyncio.sleep(2)
waiting_task.cancel() # <---
await check_tasks(waiting_task, working_task, waiting_conn_lost_task)
async def main():
print("Work done")
print("-------------------")
await work_done_case()
print("\nConnection lost")
print("-------------------")
await conn_lost_case()
print("\nCancel waiting")
print("-------------------")
await cancel_waiting_case()
asyncio.run(main())
输出:
Work done
-------------------
waiting is done
waiting connection lost is cancelled
work is done
Connection lost
-------------------
waiting is done
connection is lost
work is cancelled
Cancel waiting
-------------------
waiting is cancelled
waiting connection lost is cancelled
work is cancelled