在线程中停止异步
Stopping asyncio in thread
我有一个启动(并最终停止)asyncio 循环的线程,如下所示:
class Ook(Thread):
[…]
def run(self):
try:
self._logger.debug("Asyncio loop runs forever.")
self.loop.run_forever()
except Exception as ex:
# We do want to see ALL unhandled exceptions here.
self._logger.error("Exception raised: %s", ex)
self._logger.exception(ex)
finally:
# Stop the loop!
self._logger.warn('Closing asyncio event loop.')
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
self.loop.close()
def stop(self):
self._logger.info("Thread has been asked to stop!")
if self.loop.is_running():
self._logger.debug("Asked running asyncio loop to stop.")
for task in asyncio.Task.all_tasks():
self.loop.call_soon_threadsafe(task.cancel)
self.loop.call_soon_threadsafe(self.loop.stop)
检查是否有效的愚蠢(?)单元测试是
@pytest.mark.asyncio
async def test_start_and_stop_thread():
sut = Ook()
sut.start()
if sut.isAlive():
sut.stop()
sut.join()
assert not sut.isAlive()
assert not sut.loop.is_running()
这不起作用,因为 asyncio.CancelledError
...在 stop
方法中的任何地方捕获它们似乎没有帮助。
如果我 运行 未标记 @pytest.mark.asyncio
的测试代码,我会收到一条消息说 Task was destroyed but it is pending!
。
我做错了什么?
我们这里有几个问题。
Task.cancel() 在协程中引发了一个 asyncio.CancelledError()。您应该在协程中添加一个 "try/exec CancelledError" 来处理该异常。
另一种方法是在 def stop 中抑制 CancelledError 异常:
from asyncio import CancelledError
from contextlib import suppress
def stop(self):
self._logger.info("Thread has been asked to stop!")
if self.loop.is_running():
self._logger.debug("Asked running asyncio loop to stop.")
self.loop.call_soon_threadsafe(self.loop.stop)
for task in asyncio.Task.all_tasks():
task.cancel()
with suppress(CancelledError):
loop.run_until_complete(task)
记得关闭所有异步发电机
loop.run_until_complete(loop.shutdown_asyncgens())
我有一个启动(并最终停止)asyncio 循环的线程,如下所示:
class Ook(Thread):
[…]
def run(self):
try:
self._logger.debug("Asyncio loop runs forever.")
self.loop.run_forever()
except Exception as ex:
# We do want to see ALL unhandled exceptions here.
self._logger.error("Exception raised: %s", ex)
self._logger.exception(ex)
finally:
# Stop the loop!
self._logger.warn('Closing asyncio event loop.')
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
self.loop.close()
def stop(self):
self._logger.info("Thread has been asked to stop!")
if self.loop.is_running():
self._logger.debug("Asked running asyncio loop to stop.")
for task in asyncio.Task.all_tasks():
self.loop.call_soon_threadsafe(task.cancel)
self.loop.call_soon_threadsafe(self.loop.stop)
检查是否有效的愚蠢(?)单元测试是
@pytest.mark.asyncio
async def test_start_and_stop_thread():
sut = Ook()
sut.start()
if sut.isAlive():
sut.stop()
sut.join()
assert not sut.isAlive()
assert not sut.loop.is_running()
这不起作用,因为 asyncio.CancelledError
...在 stop
方法中的任何地方捕获它们似乎没有帮助。
如果我 运行 未标记 @pytest.mark.asyncio
的测试代码,我会收到一条消息说 Task was destroyed but it is pending!
。
我做错了什么?
我们这里有几个问题。
Task.cancel() 在协程中引发了一个 asyncio.CancelledError()。您应该在协程中添加一个 "try/exec CancelledError" 来处理该异常。
另一种方法是在 def stop 中抑制 CancelledError 异常:
from asyncio import CancelledError from contextlib import suppress def stop(self): self._logger.info("Thread has been asked to stop!") if self.loop.is_running(): self._logger.debug("Asked running asyncio loop to stop.") self.loop.call_soon_threadsafe(self.loop.stop) for task in asyncio.Task.all_tasks(): task.cancel() with suppress(CancelledError): loop.run_until_complete(task)
记得关闭所有异步发电机
loop.run_until_complete(loop.shutdown_asyncgens())