asyncio.Queue 当消费者包含在命名列表中时,生产者-消费者流程无法处理异常
asyncio.Queue producer-consumer flow cannot handle exception when consumers contain in a named list
基于 asyncio.Queue
.
的生产者-消费者流程
下面的代码参考了这个 and this blog.
import asyncio
async def produce(q: asyncio.Queue, t):
asyncio.create_task(q.put(t))
print(f'Produced {t}')
async def consume(q: asyncio.Queue):
while True:
res = await q.get()
if res > 2:
print(f'Cannot consume {res}')
raise ValueError(f'{res} too big')
print(f'Consumed {res}')
q.task_done()
async def shutdown(loop, signal=None):
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
print(f"Cancelling {len(tasks)} outstanding tasks")
[task.cancel() for task in tasks]
def handle_exception(loop, context):
msg = context.get("exception", context["message"])
print(f"Caught exception: {msg}")
asyncio.create_task(shutdown(loop))
async def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
[asyncio.create_task(consume(queue)) for _ in range(1)]
# consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]
try:
for i in range(6):
await asyncio.create_task(produce(queue, i))
await queue.join()
except asyncio.exceptions.CancelledError:
print('Cancelled')
asyncio.run(main())
像上面那样包装消费者(没有命名列表)时,输出符合预期:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Caught exception: 3 too big
Produced 4
Cancelling 2 outstanding tasks
Cancelled
但是当给消费者列表一个名字时,这意味着改变里面的代码 main()
像这样:
async def main():
# <-- snip -->
# [asyncio.create_task(consume(queue)) for _ in range(1)]
consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]
# <-- snip -->
程序卡住是这样的:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Produced 5 # <- stuck here, have to manually stop by ^C
似乎 producer
仍在继续生产,因此 queue
中的项目在 ValueError
筹集后继续增长。 handle_exception
永远不会被调用。程序卡在 await queue.join()
.
但为什么给消费者列表命名会改变代码的行为?为什么在命名消费者列表后 handle_exception
永远不会被调用?
这与命名列表无关。您的示例可以简化为:
asyncio.create_task(consume(queue))
# consumer = asyncio.create_task(consume(queue))
这里的重点是函数create_task
returns所在的Task
对象。在一种情况下,它被摧毁,但在另一种情况下则没有。已经给出了很好的答案here and
TL;DR 不要使用 set_exception_handler
来处理任务中的异常。相反,在协程本身中添加必要的 try: ... except: ...
。
问题在于尝试使用 set_exception_handler
来处理异常。该函数是 last-ditch 尝试检测一直传递到事件循环的异常,很可能是程序错误的结果。如果 loop.call_soon
或 loop.call_at
等添加的回调引发异常(但未捕获),将始终调用 set_exception_handler
安装的处理程序。
对于任务,事情更加微妙:任务驱动协程完成,完成后,存储其结果,使其可供等待任务的任何人使用,以add_done_callback
, but also to any call that invokes result()
on the task. (All this is mandated by the contract of Future
安装的回调,Task
是其子类。)当协程引发未处理的异常时,此异常只是另一个结果:当有人等待任务或调用 result()
时,届时将(重新)引发异常。
这就导致了命名和不命名任务对象的区别。如果你不给它们命名,一旦事件循环执行完它们,它们就会被销毁。在它们被销毁时,Python 会注意到没有人访问过它们的结果并将其传递给异常处理程序。另一方面,如果您将它们存储在一个变量中,只要它们被变量引用,它们就不会被销毁,并且没有理由调用事件循环处理程序:就 Python就此而言,您可能决定在任何时候对对象调用 .result()
,访问异常并根据您的程序适当地处理它。
要解决此问题,只需在协程主体周围添加一个 try: ... except: ...
块来自行处理异常。如果你不控制协程,你可以使用add_done_callback()
来检测异常。
基于 asyncio.Queue
.
的生产者-消费者流程
下面的代码参考了这个
import asyncio
async def produce(q: asyncio.Queue, t):
asyncio.create_task(q.put(t))
print(f'Produced {t}')
async def consume(q: asyncio.Queue):
while True:
res = await q.get()
if res > 2:
print(f'Cannot consume {res}')
raise ValueError(f'{res} too big')
print(f'Consumed {res}')
q.task_done()
async def shutdown(loop, signal=None):
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
print(f"Cancelling {len(tasks)} outstanding tasks")
[task.cancel() for task in tasks]
def handle_exception(loop, context):
msg = context.get("exception", context["message"])
print(f"Caught exception: {msg}")
asyncio.create_task(shutdown(loop))
async def main():
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
[asyncio.create_task(consume(queue)) for _ in range(1)]
# consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]
try:
for i in range(6):
await asyncio.create_task(produce(queue, i))
await queue.join()
except asyncio.exceptions.CancelledError:
print('Cancelled')
asyncio.run(main())
像上面那样包装消费者(没有命名列表)时,输出符合预期:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Caught exception: 3 too big
Produced 4
Cancelling 2 outstanding tasks
Cancelled
但是当给消费者列表一个名字时,这意味着改变里面的代码 main()
像这样:
async def main():
# <-- snip -->
# [asyncio.create_task(consume(queue)) for _ in range(1)]
consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]
# <-- snip -->
程序卡住是这样的:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Produced 5 # <- stuck here, have to manually stop by ^C
似乎 producer
仍在继续生产,因此 queue
中的项目在 ValueError
筹集后继续增长。 handle_exception
永远不会被调用。程序卡在 await queue.join()
.
但为什么给消费者列表命名会改变代码的行为?为什么在命名消费者列表后 handle_exception
永远不会被调用?
这与命名列表无关。您的示例可以简化为:
asyncio.create_task(consume(queue))
# consumer = asyncio.create_task(consume(queue))
这里的重点是函数create_task
returns所在的Task
对象。在一种情况下,它被摧毁,但在另一种情况下则没有。已经给出了很好的答案here and
TL;DR 不要使用 set_exception_handler
来处理任务中的异常。相反,在协程本身中添加必要的 try: ... except: ...
。
问题在于尝试使用 set_exception_handler
来处理异常。该函数是 last-ditch 尝试检测一直传递到事件循环的异常,很可能是程序错误的结果。如果 loop.call_soon
或 loop.call_at
等添加的回调引发异常(但未捕获),将始终调用 set_exception_handler
安装的处理程序。
对于任务,事情更加微妙:任务驱动协程完成,完成后,存储其结果,使其可供等待任务的任何人使用,以add_done_callback
, but also to any call that invokes result()
on the task. (All this is mandated by the contract of Future
安装的回调,Task
是其子类。)当协程引发未处理的异常时,此异常只是另一个结果:当有人等待任务或调用 result()
时,届时将(重新)引发异常。
这就导致了命名和不命名任务对象的区别。如果你不给它们命名,一旦事件循环执行完它们,它们就会被销毁。在它们被销毁时,Python 会注意到没有人访问过它们的结果并将其传递给异常处理程序。另一方面,如果您将它们存储在一个变量中,只要它们被变量引用,它们就不会被销毁,并且没有理由调用事件循环处理程序:就 Python就此而言,您可能决定在任何时候对对象调用 .result()
,访问异常并根据您的程序适当地处理它。
要解决此问题,只需在协程主体周围添加一个 try: ... except: ...
块来自行处理异常。如果你不控制协程,你可以使用add_done_callback()
来检测异常。