asyncio 运行 多个任务,同时保留顺序和处理错误
asyncio run multiple tasks while preserving order and handling errors
我的函数 run_tasks(all_tasks, window_size)
需要 asyncio
个任务的生成器和 returns 它们的值,同时:
- 运行 每个 window(大小
window_size
)来自 all_tasks
同时
- 保留返回结果的顺序(
all_tasks[i]
结果为 results[i]
)
- 处理每个异常 运行
我当前的实现:
import asyncio
from itertools import islice
# run all tasks and return their results in the same order
# window is the max number of tasks that will run in parallel
def run_tasks(all_tasks, window_size=4):
loop = asyncio.get_event_loop()
while True:
window_tasks = list(islice(all_tasks, window_size))
if not window_tasks:
break
futures = asyncio.wait(window_tasks, loop=loop)
finished, unfinished = loop.run_until_complete(futures)
# sort finished tasks by their launch order.
# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))
for finished_task in finished:
try:
yield finished_task.result()
except Exception as e:
yield repr(e)
# Example Usage:
# a coroutine that sometime raises exception
async def sleepy(i):
print(f'{i} started')
await asyncio.sleep(10 - i)
print(f'{i} finished')
if i == 5:
raise ValueError('5 is the worst')
return i
# a generator of tasks
all_tasks = (sleepy(i) for i in range(10))
for result in list(run_tasks(all_tasks)):
print(result)
问题
我的实现的问题是,如果不访问 asyncio.Task
对象的内部 属性 f._coro
,我找不到对任务进行排序的方法。
# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))
我可以使用 asyncio.gather(*tasks)
但这不会处理错误。
我愿意接受有关如何在不访问 f._coro
.
的情况下为 run_tasks()
实现这三个属性的建议
asyncio.gather
can return 如果指定它的关键字参数 return_exceptions
就会出错。为了区分真正的异常和协程导致的异常对象 return,您可以使用 ensure_future
:
将 window_tasks
与任务包装起来
futures = [asyncio.ensure_future(t, loop=loop) for t in window_tasks]
gathered = asyncio.gather(*futures, loop=loop, return_exceptions=True)
loop.run_until_complete(gathered)
for fut in futures:
try:
yield fut.result()
except Exception as e:
yield repr(e)
我的函数 run_tasks(all_tasks, window_size)
需要 asyncio
个任务的生成器和 returns 它们的值,同时:
- 运行 每个 window(大小
window_size
)来自all_tasks
同时 - 保留返回结果的顺序(
all_tasks[i]
结果为results[i]
) - 处理每个异常 运行
我当前的实现:
import asyncio
from itertools import islice
# run all tasks and return their results in the same order
# window is the max number of tasks that will run in parallel
def run_tasks(all_tasks, window_size=4):
loop = asyncio.get_event_loop()
while True:
window_tasks = list(islice(all_tasks, window_size))
if not window_tasks:
break
futures = asyncio.wait(window_tasks, loop=loop)
finished, unfinished = loop.run_until_complete(futures)
# sort finished tasks by their launch order.
# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))
for finished_task in finished:
try:
yield finished_task.result()
except Exception as e:
yield repr(e)
# Example Usage:
# a coroutine that sometime raises exception
async def sleepy(i):
print(f'{i} started')
await asyncio.sleep(10 - i)
print(f'{i} finished')
if i == 5:
raise ValueError('5 is the worst')
return i
# a generator of tasks
all_tasks = (sleepy(i) for i in range(10))
for result in list(run_tasks(all_tasks)):
print(result)
问题
我的实现的问题是,如果不访问 asyncio.Task
对象的内部 属性 f._coro
,我找不到对任务进行排序的方法。
# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))
我可以使用 asyncio.gather(*tasks)
但这不会处理错误。
我愿意接受有关如何在不访问 f._coro
.
run_tasks()
实现这三个属性的建议
asyncio.gather
can return 如果指定它的关键字参数 return_exceptions
就会出错。为了区分真正的异常和协程导致的异常对象 return,您可以使用 ensure_future
:
window_tasks
与任务包装起来
futures = [asyncio.ensure_future(t, loop=loop) for t in window_tasks]
gathered = asyncio.gather(*futures, loop=loop, return_exceptions=True)
loop.run_until_complete(gathered)
for fut in futures:
try:
yield fut.result()
except Exception as e:
yield repr(e)