如果一个失败,如何取消收集中的所有剩余任务?

How to cancel all remaining tasks in gather if one fails?

万一gather的一个任务抛出异常,其他任务仍然允许继续。

嗯,这不是我需要的。我想区分致命错误和需要取消所有剩余任务的错误,以及非致命错误和应该记录但允许其他任务继续的错误。

这是我尝试实现此功能的失败尝试:

from asyncio import gather, get_event_loop, sleep

class ErrorThatShouldCancelOtherTasks(Exception):
    pass

async def my_sleep(secs):
    await sleep(secs)
    if secs == 5:
        raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
    print(f'Slept for {secs}secs.')

async def main():
    try:
        sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
        await sleepers
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        sleepers.cancel()
    finally:
        await sleep(5)

get_event_loop().run_until_complete(main())

(这里的finally await sleep是为了防止解释器立即关闭,它会自行取消所有任务)

奇怪的是,在 gather 上调用 cancel 并没有真正取消它!

PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.

我对这种行为感到非常惊讶,因为它似乎与 the documentation 相矛盾,其中指出:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

Return a future aggregating results from the given coroutine objects or futures.

(...)

Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. (...)

我在这里错过了什么?如何取消剩余任务?

您的实现的问题是它在 sleepers 已经引发之后调用 sleepers.cancel()。从技术上讲,gather() 返回的未来处于完成状态,因此它的取消必须是 no-op.

要更正代码,您只需要自己取消 children 而不是相信 gather 的未来来做。当然,协同程序本身是不可取消的,因此您需要先将它们转换为任务(无论如何 gather 都会这样做,因此您没有做任何额外的工作)。例如:

async def main():
    tasks = [asyncio.ensure_future(my_sleep(secs))
             for secs in [2, 5, 7]]
    try:
        await asyncio.gather(*tasks)
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        for t in tasks:
            t.cancel()
    finally:
        await sleep(5)

I am very surprised by this behavior since it seems to be contradictory to the documentation[...]

gather 的最初障碍是它并不是真正的 运行 任务,它只是一个等待任务完成的助手。出于这个原因 gather 如果其中一些任务因异常而失败,则不会取消剩余的任务 - 它只是放弃等待并传播异常,让剩余的任务在后台继续进行。这是 reported as a bug, but wasn't fixed for backward compatibility and because the behavior is documented and unchanged from the beginning. But here we have another wart: the documentation explicitly promises being able to cancel the returned future. Your code does exactly that and that doesn't work, without it being obvious why (at least it took me a while to figure it out, and required reading the source). It turns out that the contract of Future 实际上阻止了它的工作。当你调用cancel()的时候,gather返回的future已经完成,取消一个已经完成的future是没有意义的,只是no-op . (原因是一个已完成的 future 有一个 well-defined 结果,可以被外部代码观察到。取消它会改变它的结果,这是不允许的。)

换句话说,文档并没有错误,因为如果您在 await sleepers 完成之前执行它,取消就会有效。但是,它 具有误导性 ,因为它似乎允许取消 gather() 在其等待加注之一的这个重要用例中,但实际上不允许。

使用 gather 时出现的此类问题是许多人热切等待的原因(无双关语)trio-style 托儿所 in asyncio.

您可以创建自己的自定义收集函数

这会在发生任何异常时取消其所有子项:

import asyncio

async def gather(*tasks, **kwargs):
    tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
              for task in tasks ]
    try:
        return await asyncio.gather(*tasks, **kwargs)
    except BaseException as e:
        for task in tasks:
            task.cancel()
        raise e


# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())