asyncio CancelledError 和 KeyboardInterrupt
asyncio CancelledError and KeyboardInterrupt
我正在尝试 2 种方法来停止来自 运行 的无限循环:
- supervisor_1: 任务以编程方式取消
- supervisor_2: 使用 Ctrl+C 停止任务
虽然 supervisor_2 在中断时不会抛出任何错误,但我无法从获取 supervisor_1 Task was destroyed but it is pending!
。知道为什么吗?
代码如下:
import asyncio
import aioredis
from functools import partial
class Listener:
def __init__(self, redis_conn):
self.redis_conn = redis_conn
async def forever(self, loop_name):
counter = 0
try:
while True:
print('{}: {}'.format(loop_name, counter))
counter += 1
await asyncio.sleep(1)
except asyncio.CancelledError:
print('Task Cancelled')
self.redis_conn.close()
await self.redis_conn.wait_closed()
async def supervisor_1(redis_conn):
redis_conn = await redis_conn
l = Listener(redis_conn)
task = asyncio.ensure_future(
asyncio.gather(l.forever('loop_1'),
l.forever('loop_2')))
await asyncio.sleep(2)
task.cancel()
async def supervisor_2(redis_conn):
redis_conn = await redis_conn
l = Listener(redis_conn)
await asyncio.gather(l.forever('loop_1'),
l.forever('loop_2'))
if __name__ == '__main__':
redis_conn = aioredis.create_pool(('localhost', 5003), db=1)
loop = asyncio.get_event_loop()
run = partial(supervisor_2, redis_conn=redis_conn)
task = asyncio.ensure_future(run())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
print('Interruped !')
task.cancel()
loop.run_forever()
finally:
loop.close()
@update:
感谢@Gerasimov,这是一个解决问题的版本,但不知何故仍然会不时地在 KeyboardInterrupt 上引发错误:
async def supervisor(redis_conn):
redis_conn = await redis_conn
l = Listener(redis_conn)
task = asyncio.ensure_future(
asyncio.gather(l.forever('loop_1'),
l.forever('loop_2'))
)
await asyncio.sleep(10)
task.cancel()
with suppress(asyncio.CancelledError):
await task
async def kill_tasks():
pending = asyncio.Task.all_tasks()
for task in pending:
task.cancel()
with suppress(asyncio.CancelledError):
await task
和
if __name__ == '__main__':
redis_conn = aioredis.create_pool(('localhost', 5003), db=1)
loop = asyncio.get_event_loop()
run = partial(supervisor, redis_conn=redis_conn)
task = asyncio.ensure_future(run())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
print('Interruped !')
loop.run_until_complete(kill_tasks())
finally:
loop.close()
task.cancel()
本身并没有完成任务:它只是对任务说 CancelledError
应该在它里面被提升并且 returns 立即。您应该调用它并等待任务实际上被取消(同时它会提高 CancelledError
)。
你也不应该在任务中抑制 CancelledError
。
阅读 我试图展示处理任务的不同方式。例如取消一些任务并等待它取消你可以这样做:
from contextlib import suppress
task = ... # remember, task doesn't suppress CancelledError itself
task.cancel() # returns immediately, we should await task raised CancelledError.
with suppress(asyncio.CancelledError):
await task # or loop.run_until_complete(task) if it happens after event loop stopped
# Now when we awaited for CancelledError and handled it,
# task is finally over and we can close event loop without warning.
我正在尝试 2 种方法来停止来自 运行 的无限循环:
- supervisor_1: 任务以编程方式取消
- supervisor_2: 使用 Ctrl+C 停止任务
虽然 supervisor_2 在中断时不会抛出任何错误,但我无法从获取 supervisor_1 Task was destroyed but it is pending!
。知道为什么吗?
代码如下:
import asyncio
import aioredis
from functools import partial
class Listener:
def __init__(self, redis_conn):
self.redis_conn = redis_conn
async def forever(self, loop_name):
counter = 0
try:
while True:
print('{}: {}'.format(loop_name, counter))
counter += 1
await asyncio.sleep(1)
except asyncio.CancelledError:
print('Task Cancelled')
self.redis_conn.close()
await self.redis_conn.wait_closed()
async def supervisor_1(redis_conn):
redis_conn = await redis_conn
l = Listener(redis_conn)
task = asyncio.ensure_future(
asyncio.gather(l.forever('loop_1'),
l.forever('loop_2')))
await asyncio.sleep(2)
task.cancel()
async def supervisor_2(redis_conn):
redis_conn = await redis_conn
l = Listener(redis_conn)
await asyncio.gather(l.forever('loop_1'),
l.forever('loop_2'))
if __name__ == '__main__':
redis_conn = aioredis.create_pool(('localhost', 5003), db=1)
loop = asyncio.get_event_loop()
run = partial(supervisor_2, redis_conn=redis_conn)
task = asyncio.ensure_future(run())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
print('Interruped !')
task.cancel()
loop.run_forever()
finally:
loop.close()
@update:
感谢@Gerasimov,这是一个解决问题的版本,但不知何故仍然会不时地在 KeyboardInterrupt 上引发错误:
async def supervisor(redis_conn):
redis_conn = await redis_conn
l = Listener(redis_conn)
task = asyncio.ensure_future(
asyncio.gather(l.forever('loop_1'),
l.forever('loop_2'))
)
await asyncio.sleep(10)
task.cancel()
with suppress(asyncio.CancelledError):
await task
async def kill_tasks():
pending = asyncio.Task.all_tasks()
for task in pending:
task.cancel()
with suppress(asyncio.CancelledError):
await task
和
if __name__ == '__main__':
redis_conn = aioredis.create_pool(('localhost', 5003), db=1)
loop = asyncio.get_event_loop()
run = partial(supervisor, redis_conn=redis_conn)
task = asyncio.ensure_future(run())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
print('Interruped !')
loop.run_until_complete(kill_tasks())
finally:
loop.close()
task.cancel()
本身并没有完成任务:它只是对任务说 CancelledError
应该在它里面被提升并且 returns 立即。您应该调用它并等待任务实际上被取消(同时它会提高 CancelledError
)。
你也不应该在任务中抑制 CancelledError
。
阅读
from contextlib import suppress
task = ... # remember, task doesn't suppress CancelledError itself
task.cancel() # returns immediately, we should await task raised CancelledError.
with suppress(asyncio.CancelledError):
await task # or loop.run_until_complete(task) if it happens after event loop stopped
# Now when we awaited for CancelledError and handled it,
# task is finally over and we can close event loop without warning.