取消任务后请说明"Task was destroyed but it is pending!"
Please explain "Task was destroyed but it is pending!" after cancelling tasks
我正在使用 Python 3.4.2 学习 asyncio,我用它来连续监听 IPC 总线,而 gbulb 则监听 DBus。
我创建了一个函数 listen_to_ipc_channel_layer
,它持续侦听 IPC 通道上的传入消息并将消息传递给 message_handler
。
我也在听 SIGTERM 和 SIGINT。当我将 SIGTERM 发送到 python 进程 运行 您在底部找到的代码时,脚本应该正常终止。
我遇到的问题是以下警告:
got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>
Process finished with exit code 0
...使用以下代码:
import asyncio
import gbulb
import signal
import asgi_ipc as asgi
def main():
asyncio.async(listen_to_ipc_channel_layer())
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
# Start listening on the Linux IPC bus for incoming messages
loop.run_forever()
loop.close()
@asyncio.coroutine
def listen_to_ipc_channel_layer():
"""Listens to the Linux IPC bus for messages"""
while True:
message_handler(message=channel_layer.receive(["my_channel"]))
try:
yield from asyncio.sleep(0.1)
except asyncio.CancelledError:
break
def ask_exit():
loop = asyncio.get_event_loop()
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
if __name__ == "__main__":
gbulb.install()
# Connect to the IPC bus
channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
main()
我对 asyncio 仍然只了解很少,但我想我知道发生了什么。在等待 yield from asyncio.sleep(0.1)
时,信号处理程序捕获了 SIGTERM 并在该过程中调用 task.cancel()
.
这不应该触发 while True:
循环中的 CancelledError
吗? (因为不是,不过我是这么理解的"Calling cancel() will throw a CancelledError to the wrapped coroutine")。
最终 loop.stop()
被调用,它停止循环而不等待 yield from asyncio.sleep(0.1)
到 return 结果甚至整个协程 listen_to_ipc_channel_layer
.
如有错误请指正
我认为我唯一需要做的就是让我的程序等待 yield from asyncio.sleep(0.1)
到 return 结果 and/or协程打破 while 循环并完成。
我相信我混淆了很多东西。请帮我弄清楚这些事情,这样我就可以弄清楚如何在没有警告的情况下优雅地关闭事件循环。
问题出在取消任务后立即关闭循环。作为 cancel() docs state
"This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop."
看这段代码:
import asyncio
import signal
async def pending_doom():
await asyncio.sleep(2)
print(">> Cancelling tasks now")
for task in asyncio.Task.all_tasks():
task.cancel()
print(">> Done cancelling tasks")
asyncio.get_event_loop().stop()
def ask_exit():
for task in asyncio.Task.all_tasks():
task.cancel()
async def looping_coro():
print("Executing coroutine")
while True:
try:
await asyncio.sleep(0.25)
except asyncio.CancelledError:
print("Got CancelledError")
break
print("Done waiting")
print("Done executing coroutine")
asyncio.get_event_loop().stop()
def main():
asyncio.async(pending_doom())
asyncio.async(looping_coro())
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
loop.run_forever()
# I had to manually remove the handlers to
# avoid an exception on BaseEventLoop.__del__
for sig in (signal.SIGINT, signal.SIGTERM):
loop.remove_signal_handler(sig)
if __name__ == '__main__':
main()
注意 ask_exit
取消任务但不 stop
循环,在下一个周期 looping_coro()
停止它。如果你取消它的输出是:
Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine
注意 pending_doom
如何在 之后立即取消并停止循环 。如果你让它 运行 直到 pending_doom
协程从睡眠中醒来,你会看到同样的警告:
Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>
问题的意思是循环没有时间完成所有任务。
This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop.
在您的方法中没有机会执行循环的“下一个循环”。为了使其正确,您应该将停止操作移动到一个单独的非循环协程,让您的循环有机会完成。
第二个重要的事情是 CancelledError
加薪。
Unlike Future.cancel(), this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.
Immediately after this method is called, cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).
所以在清理之后,你的协程必须引发 CancelledError
才能被标记为已取消。
使用额外的协程来停止循环不是问题,因为它不是循环的并且在执行后立即完成。
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(listen_to_ipc_channel_layer())
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
loop.run_forever()
print("Close")
loop.close()
@asyncio.coroutine
def listen_to_ipc_channel_layer():
while True:
try:
print("Running")
yield from asyncio.sleep(0.1)
except asyncio.CancelledError as e:
print("Break it out")
raise e # Raise a proper error
# Stop the loop concurrently
@asyncio.coroutine
def exit():
loop = asyncio.get_event_loop()
print("Stop")
loop.stop()
def ask_exit():
for task in asyncio.Task.all_tasks():
task.cancel()
asyncio.ensure_future(exit())
if __name__ == "__main__":
main()
发生这种情况的原因如@Yeray Diaz Diaz 所解释
在我的例子中,我想取消所有在第一个完成后未完成的任务,所以我最终取消了额外的工作,然后使用 loop._run_once()
到 运行 循环更多一点并允许它们停止:
loop = asyncio.get_event_loop()
job = asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
tasks_finished,tasks_pending, = loop.run_until_complete(job)
tasks_done = [t for t in tasks_finished if t.exception() is None]
if tasks_done == 0:
raise Exception("Failed for all tasks.")
assert len(tasks_done) == 1
data = tasks_done[0].result()
for t in tasks_pending:
t.cancel()
t.cancel()
while not all([t.done() for t in tasks_pending]):
loop._run_once()
我收到这条消息,我相信它是由挂起任务的垃圾回收引起的。 Python 开发人员正在争论在 asyncio 中创建的任务是否应该创建强引用并决定它们不应该(在研究这个问题 2 天后我强烈反对!...请参阅此处的讨论 https://bugs.python.org/issue21163)
我为自己创建了这个实用程序来对任务进行强引用并自动清理它(仍然需要彻底测试它)...
import asyncio
#create a strong reference to tasks since asyncio doesn't do this for you
task_references = set()
def register_ensure_future(coro):
task = asyncio.ensure_future(coro)
task_references.add(task)
# Setup cleanup of strong reference on task completion...
def _on_completion(f):
task_references.remove(f)
task.add_done_callback(_on_completion)
return task
在我看来,任务只要活跃就应该有很强的参考性!但是 asyncio 不会为你做这些,所以一旦发生 gc 和长时间的调试,你可能会遇到一些糟糕的惊喜。
我正在使用 Python 3.4.2 学习 asyncio,我用它来连续监听 IPC 总线,而 gbulb 则监听 DBus。
我创建了一个函数 listen_to_ipc_channel_layer
,它持续侦听 IPC 通道上的传入消息并将消息传递给 message_handler
。
我也在听 SIGTERM 和 SIGINT。当我将 SIGTERM 发送到 python 进程 运行 您在底部找到的代码时,脚本应该正常终止。
我遇到的问题是以下警告:
got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>
Process finished with exit code 0
...使用以下代码:
import asyncio
import gbulb
import signal
import asgi_ipc as asgi
def main():
asyncio.async(listen_to_ipc_channel_layer())
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
# Start listening on the Linux IPC bus for incoming messages
loop.run_forever()
loop.close()
@asyncio.coroutine
def listen_to_ipc_channel_layer():
"""Listens to the Linux IPC bus for messages"""
while True:
message_handler(message=channel_layer.receive(["my_channel"]))
try:
yield from asyncio.sleep(0.1)
except asyncio.CancelledError:
break
def ask_exit():
loop = asyncio.get_event_loop()
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
if __name__ == "__main__":
gbulb.install()
# Connect to the IPC bus
channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
main()
我对 asyncio 仍然只了解很少,但我想我知道发生了什么。在等待 yield from asyncio.sleep(0.1)
时,信号处理程序捕获了 SIGTERM 并在该过程中调用 task.cancel()
.
这不应该触发 while True:
循环中的 CancelledError
吗? (因为不是,不过我是这么理解的"Calling cancel() will throw a CancelledError to the wrapped coroutine")。
最终 loop.stop()
被调用,它停止循环而不等待 yield from asyncio.sleep(0.1)
到 return 结果甚至整个协程 listen_to_ipc_channel_layer
.
如有错误请指正
我认为我唯一需要做的就是让我的程序等待 yield from asyncio.sleep(0.1)
到 return 结果 and/or协程打破 while 循环并完成。
我相信我混淆了很多东西。请帮我弄清楚这些事情,这样我就可以弄清楚如何在没有警告的情况下优雅地关闭事件循环。
问题出在取消任务后立即关闭循环。作为 cancel() docs state
"This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop."
看这段代码:
import asyncio
import signal
async def pending_doom():
await asyncio.sleep(2)
print(">> Cancelling tasks now")
for task in asyncio.Task.all_tasks():
task.cancel()
print(">> Done cancelling tasks")
asyncio.get_event_loop().stop()
def ask_exit():
for task in asyncio.Task.all_tasks():
task.cancel()
async def looping_coro():
print("Executing coroutine")
while True:
try:
await asyncio.sleep(0.25)
except asyncio.CancelledError:
print("Got CancelledError")
break
print("Done waiting")
print("Done executing coroutine")
asyncio.get_event_loop().stop()
def main():
asyncio.async(pending_doom())
asyncio.async(looping_coro())
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
loop.run_forever()
# I had to manually remove the handlers to
# avoid an exception on BaseEventLoop.__del__
for sig in (signal.SIGINT, signal.SIGTERM):
loop.remove_signal_handler(sig)
if __name__ == '__main__':
main()
注意 ask_exit
取消任务但不 stop
循环,在下一个周期 looping_coro()
停止它。如果你取消它的输出是:
Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine
注意 pending_doom
如何在 之后立即取消并停止循环 。如果你让它 运行 直到 pending_doom
协程从睡眠中醒来,你会看到同样的警告:
Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>
问题的意思是循环没有时间完成所有任务。
This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop.
在您的方法中没有机会执行循环的“下一个循环”。为了使其正确,您应该将停止操作移动到一个单独的非循环协程,让您的循环有机会完成。
第二个重要的事情是 CancelledError
加薪。
Unlike Future.cancel(), this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.
Immediately after this method is called, cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).
所以在清理之后,你的协程必须引发 CancelledError
才能被标记为已取消。
使用额外的协程来停止循环不是问题,因为它不是循环的并且在执行后立即完成。
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(listen_to_ipc_channel_layer())
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
loop.run_forever()
print("Close")
loop.close()
@asyncio.coroutine
def listen_to_ipc_channel_layer():
while True:
try:
print("Running")
yield from asyncio.sleep(0.1)
except asyncio.CancelledError as e:
print("Break it out")
raise e # Raise a proper error
# Stop the loop concurrently
@asyncio.coroutine
def exit():
loop = asyncio.get_event_loop()
print("Stop")
loop.stop()
def ask_exit():
for task in asyncio.Task.all_tasks():
task.cancel()
asyncio.ensure_future(exit())
if __name__ == "__main__":
main()
发生这种情况的原因如@Yeray Diaz Diaz 所解释
在我的例子中,我想取消所有在第一个完成后未完成的任务,所以我最终取消了额外的工作,然后使用 loop._run_once()
到 运行 循环更多一点并允许它们停止:
loop = asyncio.get_event_loop()
job = asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
tasks_finished,tasks_pending, = loop.run_until_complete(job)
tasks_done = [t for t in tasks_finished if t.exception() is None]
if tasks_done == 0:
raise Exception("Failed for all tasks.")
assert len(tasks_done) == 1
data = tasks_done[0].result()
for t in tasks_pending:
t.cancel()
t.cancel()
while not all([t.done() for t in tasks_pending]):
loop._run_once()
我收到这条消息,我相信它是由挂起任务的垃圾回收引起的。 Python 开发人员正在争论在 asyncio 中创建的任务是否应该创建强引用并决定它们不应该(在研究这个问题 2 天后我强烈反对!...请参阅此处的讨论 https://bugs.python.org/issue21163)
我为自己创建了这个实用程序来对任务进行强引用并自动清理它(仍然需要彻底测试它)...
import asyncio
#create a strong reference to tasks since asyncio doesn't do this for you
task_references = set()
def register_ensure_future(coro):
task = asyncio.ensure_future(coro)
task_references.add(task)
# Setup cleanup of strong reference on task completion...
def _on_completion(f):
task_references.remove(f)
task.add_done_callback(_on_completion)
return task
在我看来,任务只要活跃就应该有很强的参考性!但是 asyncio 不会为你做这些,所以一旦发生 gc 和长时间的调试,你可能会遇到一些糟糕的惊喜。