asyncio 运行 多个任务,同时保留顺序和处理错误

asyncio run multiple tasks while preserving order and handling errors

我的函数 run_tasks(all_tasks, window_size) 需要 asyncio 个任务的生成器和 returns 它们的值,同时:

  1. 运行 每个 window(大小 window_size)来自 all_tasks同时
  2. 保留返回结果的顺序(all_tasks[i] 结果为 results[i]
  3. 处理每个异常 运行

我当前的实现:

import asyncio
from itertools import islice


# run all tasks and return their results in the same order
# window is the max number of tasks that will run in parallel
def run_tasks(all_tasks, window_size=4):
    loop = asyncio.get_event_loop()

    while True:
        window_tasks = list(islice(all_tasks, window_size))
        if not window_tasks:
            break

        futures = asyncio.wait(window_tasks, loop=loop)
        finished, unfinished = loop.run_until_complete(futures)

        # sort finished tasks by their launch order.
        # removing this line makes returned tasks unordered
        finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))

        for finished_task in finished:
            try:
                yield finished_task.result()
            except Exception as e:
                yield repr(e)

# Example Usage:

# a coroutine that sometime raises exception
async def sleepy(i):
    print(f'{i} started')
    await asyncio.sleep(10 - i)
    print(f'{i} finished')
    if i == 5:
        raise ValueError('5 is the worst')
    return i

# a generator of tasks
all_tasks = (sleepy(i) for i in range(10))

for result in list(run_tasks(all_tasks)):
    print(result)

问题

我的实现的问题是,如果不访问 asyncio.Task 对象的内部 属性 f._coro,我找不到对任务进行排序的方法。

# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))

我可以使用 asyncio.gather(*tasks) 但这不会处理错误。

我愿意接受有关如何在不访问 f._coro.

的情况下为 run_tasks() 实现这三个属性的建议

asyncio.gather can return 如果指定它的关键字参数 return_exceptions 就会出错。为了区分真正的异常和协程导致的异常对象 return,您可以使用 ensure_future:

window_tasks 与任务包装起来
futures = [asyncio.ensure_future(t, loop=loop) for t in window_tasks]
gathered =  asyncio.gather(*futures, loop=loop, return_exceptions=True)
loop.run_until_complete(gathered)

for fut in futures:
    try:
        yield fut.result()
    except Exception as e:
        yield repr(e)