asyncio.Queue 当消费者包含在命名列表中时,生产者-消费者流程无法处理异常

asyncio.Queue producer-consumer flow cannot handle exception when consumers contain in a named list

基于 asyncio.Queue.
的生产者-消费者流程 下面的代码参考了这个 and this blog.

import asyncio

async def produce(q: asyncio.Queue, t):
    asyncio.create_task(q.put(t))
    print(f'Produced {t}')

async def consume(q: asyncio.Queue):
    while True:
        res = await q.get()
        if res > 2:
            print(f'Cannot consume {res}')
            raise ValueError(f'{res} too big')
        print(f'Consumed {res}')
        q.task_done()

async def shutdown(loop, signal=None):
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    print(f"Cancelling {len(tasks)} outstanding tasks")
    [task.cancel() for task in tasks]

def handle_exception(loop, context):
    msg = context.get("exception", context["message"])
    print(f"Caught exception: {msg}")
    asyncio.create_task(shutdown(loop))

async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

    [asyncio.create_task(consume(queue)) for _ in range(1)]
    # consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]

    try:
        for i in range(6):
            await asyncio.create_task(produce(queue, i))
        await queue.join()
    except asyncio.exceptions.CancelledError:
        print('Cancelled')


asyncio.run(main())

像上面那样包装消费者(没有命名列表)时,输出符合预期:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Caught exception: 3 too big
Produced 4
Cancelling 2 outstanding tasks
Cancelled

但是当给消费者列表一个名字时,这意味着改变里面的代码 main() 像这样:

async def main():
    # <-- snip -->

    # [asyncio.create_task(consume(queue)) for _ in range(1)]
    consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]

    # <-- snip -->

程序卡住是这样的:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Produced 5  # <- stuck here, have to manually stop by ^C

似乎 producer 仍在继续生产,因此 queue 中的项目在 ValueError 筹集后继续增长。 handle_exception 永远不会被调用。程序卡在 await queue.join().

但为什么给消费者列表命名会改变代码的行为?为什么在命名消费者列表后 handle_exception 永远不会被调用?

这与命名列表无关。您的示例可以简化为:

asyncio.create_task(consume(queue))
# consumer = asyncio.create_task(consume(queue))

这里的重点是函数create_taskreturns所在的Task对象。在一种情况下,它被摧毁,但在另一种情况下则没有。已经给出了很好的答案here and

TL;DR 不要使用 set_exception_handler 来处理任务中的异常。相反,在协程本身中添加必要的 try: ... except: ...

问题在于尝试使用 set_exception_handler 来处理异常。该函数是 last-ditch 尝试检测一直传递到事件循环的异常,很可能是程序错误的结果。如果 loop.call_soonloop.call_at 等添加的回调引发异常(但未捕获),将始终调用 set_exception_handler 安装的处理程序。

对于任务,事情更加微妙:任务驱动协程完成,完成后,存储其结果,使其可供等待任务的任何人使用,以add_done_callback, but also to any call that invokes result() on the task. (All this is mandated by the contract of Future 安装的回调,Task 是其子类。)当协程引发未处理的异常时,此异常只是另一个结果:当有人等待任务或调用 result() 时,届时将(重新)引发异常。

这就导致了命名和不命名任务对象的区别。如果你不给它们命名,一旦事件循环执行完它们,它们就会被销毁。在它们被销毁时,Python 会注意到没有人访问过它们的结果并将其传递给异常处理程序。另一方面,如果您将它们存储在一个变量中,只要它们被变量引用,它们就不会被销毁,并且没有理由调用事件循环处理程序:就 Python就此而言,您可能决定在任何时候对对象调用 .result(),访问异常并根据您的程序适当地处理它。

要解决此问题,只需在协程主体周围添加一个 try: ... except: ... 块来自行处理异常。如果你不控制协程,你可以使用add_done_callback()来检测异常。