Python Asyncio - RuntimeError: Cannot close a running event loop
Python Asyncio - RuntimeError: Cannot close a running event loop
我正在尝试解决此错误:RuntimeError: Cannot close a running event loop
在我的 asyncio 进程中。我相信它正在发生,因为在任务仍未决时出现故障,然后我尝试关闭事件循环。我在想我需要在关闭事件循环之前等待剩余的响应,但我不确定如何在我的特定情况下正确地完成它。
def start_job(self):
if self.auth_expire_timestamp < get_timestamp():
api_obj = api_handler.Api('Api Name', self.dbObj)
self.api_auth_resp = api_obj.get_auth_response()
self.api_attr = api_obj.get_attributes()
try:
self.queue_manager(self.do_stuff(json_data))
except aiohttp.ServerDisconnectedError as e:
logging.info("Reconnecting...")
api_obj = api_handler.Api('API Name', self.dbObj)
self.api_auth_resp = api_obj.get_auth_response()
self.api_attr = api_obj.get_attributes()
self.run_eligibility()
async def do_stuff(self, data):
tasks = []
async with aiohttp.ClientSession() as session:
for row in data:
task = asyncio.ensure_future(self.async_post('url', session, row))
tasks.append(task)
result = await asyncio.gather(*tasks)
self.load_results(result)
def queue_manager(self, method):
self.loop = asyncio.get_event_loop()
future = asyncio.ensure_future(method)
self.loop.run_until_complete(future)
async def async_post(self, resource, session, data):
async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
resp = []
try:
headers = response.headers['foo']
content = await response.read()
resp.append(headers)
resp.append(content)
except KeyError as e:
logging.error('KeyError at async_post response')
logging.error(e)
return resp
def shutdown(self):
//need to do something here to await the remaining tasks and then I need to re-start a new event loop, which i think i can do, just don't know how to appropriately stop the current one.
self.loop.close()
return True
如何处理错误并正确关闭事件循环,以便我可以启动一个新事件循环并重新启动整个程序并继续。
编辑:
这就是我现在正在尝试的,基于 。不幸的是,这个错误很少发生,所以除非我能强制执行,否则我将不得不等待,看看它是否有效。在我的 queue_manager
方法中,我将其更改为:
try:
self.loop.run_until_complete(future)
except Exception as e:
future.cancel()
self.loop.run_until_complete(future)
future.exception()
更新:
我摆脱了 shutdown()
方法并将其添加到我的 queue_manager()
方法中,它似乎工作正常:
try:
self.loop.run_until_complete(future)
except Exception as e:
future.cancel()
self.check_in_records()
self.reconnect()
self.start_job()
future.exception()
要回答最初陈述的问题,不需要 close()
一个 运行ning 循环,您可以为整个程序重复使用同一个循环。
根据更新中的代码,您的 queue_manager
可能如下所示:
try:
self.loop.run_until_complete(future)
except Exception as e:
self.check_in_records()
self.reconnect()
self.start_job()
取消 future
没有必要,据我所知没有任何效果。这与 which specifically reacts to KeyboardInterrupt
, special because it is raised by asyncio itself. KeyboardInterrupt
can be propagated by run_until_complete
without the future having actually completed. Handling Ctrl-C correctly in asyncio is very hard or even impossible (see here 的细节不同),但幸运的是问题根本不是关于 Ctrl-C,而是关于协程引发的异常。 (请注意,KeyboardInterrupt
不继承自 Exception
,因此在 Ctrl-C 的情况下,except 主体甚至不会执行。)
I was canceling the future because in this instance there are remaining tasks pending and i want to essentially remove those tasks and start a fresh event loop.
这是一件正确的事情,但是(更新的)问题中的代码只取消了一个未来,已经传递给 run_until_complete
的那个。回想一下,future 是稍后提供的结果值的占位符。一旦提供了值,就可以通过调用 future.result()
来检索它。如果 future 的 "value" 是一个异常,future.result()
将引发该异常。 run_until_complete
有约定,只要给定的未来产生一个值,它就会 运行 事件循环,然后它 returns 该值。如果 "value" 实际上是要引发的异常,那么 run_until_complete
将重新引发它。例如:
loop = asyncio.get_event_loop()
fut = loop.create_future()
loop.call_soon(fut.set_exception, ZeroDivisionError)
# raises ZeroDivisionError, as that is the future's result,
# manually set
loop.run_until_complete(fut)
当所讨论的 future 实际上是一个 Task
,一个将协程包装到 Future
的异步特定对象时,这种 future 的结果就是协程返回的对象。如果协程引发异常,那么检索结果将重新引发它,run_until_complete
:
async def fail():
1/0
loop = asyncio.get_event_loop()
fut = loop.create_task(fail())
# raises ZeroDivisionError, as that is the future's result,
# because the coroutine raises it
loop.run_until_complete(fut)
处理任务时,run_until_complete
完成意味着协程也已完成,返回值或引发异常,由run_until_complete
返回或引发决定。
另一方面,通过安排恢复任务和暂停它的 await
表达式引发 CancelledError
来取消任务。除非任务专门捕获并抑制此异常(行为良好的 asyncio 代码不应该这样做),否则任务将停止执行并且 CancelledError
将成为其结果。但是,如果调用 cancel()
时协程已经完成,则 cancel()
无法执行任何操作,因为没有待处理的 await
可将 CancelledError
注入。
我正在尝试解决此错误:RuntimeError: Cannot close a running event loop
在我的 asyncio 进程中。我相信它正在发生,因为在任务仍未决时出现故障,然后我尝试关闭事件循环。我在想我需要在关闭事件循环之前等待剩余的响应,但我不确定如何在我的特定情况下正确地完成它。
def start_job(self):
if self.auth_expire_timestamp < get_timestamp():
api_obj = api_handler.Api('Api Name', self.dbObj)
self.api_auth_resp = api_obj.get_auth_response()
self.api_attr = api_obj.get_attributes()
try:
self.queue_manager(self.do_stuff(json_data))
except aiohttp.ServerDisconnectedError as e:
logging.info("Reconnecting...")
api_obj = api_handler.Api('API Name', self.dbObj)
self.api_auth_resp = api_obj.get_auth_response()
self.api_attr = api_obj.get_attributes()
self.run_eligibility()
async def do_stuff(self, data):
tasks = []
async with aiohttp.ClientSession() as session:
for row in data:
task = asyncio.ensure_future(self.async_post('url', session, row))
tasks.append(task)
result = await asyncio.gather(*tasks)
self.load_results(result)
def queue_manager(self, method):
self.loop = asyncio.get_event_loop()
future = asyncio.ensure_future(method)
self.loop.run_until_complete(future)
async def async_post(self, resource, session, data):
async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
resp = []
try:
headers = response.headers['foo']
content = await response.read()
resp.append(headers)
resp.append(content)
except KeyError as e:
logging.error('KeyError at async_post response')
logging.error(e)
return resp
def shutdown(self):
//need to do something here to await the remaining tasks and then I need to re-start a new event loop, which i think i can do, just don't know how to appropriately stop the current one.
self.loop.close()
return True
如何处理错误并正确关闭事件循环,以便我可以启动一个新事件循环并重新启动整个程序并继续。
编辑:
这就是我现在正在尝试的,基于 queue_manager
方法中,我将其更改为:
try:
self.loop.run_until_complete(future)
except Exception as e:
future.cancel()
self.loop.run_until_complete(future)
future.exception()
更新:
我摆脱了 shutdown()
方法并将其添加到我的 queue_manager()
方法中,它似乎工作正常:
try:
self.loop.run_until_complete(future)
except Exception as e:
future.cancel()
self.check_in_records()
self.reconnect()
self.start_job()
future.exception()
要回答最初陈述的问题,不需要 close()
一个 运行ning 循环,您可以为整个程序重复使用同一个循环。
根据更新中的代码,您的 queue_manager
可能如下所示:
try:
self.loop.run_until_complete(future)
except Exception as e:
self.check_in_records()
self.reconnect()
self.start_job()
取消 future
没有必要,据我所知没有任何效果。这与 KeyboardInterrupt
, special because it is raised by asyncio itself. KeyboardInterrupt
can be propagated by run_until_complete
without the future having actually completed. Handling Ctrl-C correctly in asyncio is very hard or even impossible (see here 的细节不同),但幸运的是问题根本不是关于 Ctrl-C,而是关于协程引发的异常。 (请注意,KeyboardInterrupt
不继承自 Exception
,因此在 Ctrl-C 的情况下,except 主体甚至不会执行。)
I was canceling the future because in this instance there are remaining tasks pending and i want to essentially remove those tasks and start a fresh event loop.
这是一件正确的事情,但是(更新的)问题中的代码只取消了一个未来,已经传递给 run_until_complete
的那个。回想一下,future 是稍后提供的结果值的占位符。一旦提供了值,就可以通过调用 future.result()
来检索它。如果 future 的 "value" 是一个异常,future.result()
将引发该异常。 run_until_complete
有约定,只要给定的未来产生一个值,它就会 运行 事件循环,然后它 returns 该值。如果 "value" 实际上是要引发的异常,那么 run_until_complete
将重新引发它。例如:
loop = asyncio.get_event_loop()
fut = loop.create_future()
loop.call_soon(fut.set_exception, ZeroDivisionError)
# raises ZeroDivisionError, as that is the future's result,
# manually set
loop.run_until_complete(fut)
当所讨论的 future 实际上是一个 Task
,一个将协程包装到 Future
的异步特定对象时,这种 future 的结果就是协程返回的对象。如果协程引发异常,那么检索结果将重新引发它,run_until_complete
:
async def fail():
1/0
loop = asyncio.get_event_loop()
fut = loop.create_task(fail())
# raises ZeroDivisionError, as that is the future's result,
# because the coroutine raises it
loop.run_until_complete(fut)
处理任务时,run_until_complete
完成意味着协程也已完成,返回值或引发异常,由run_until_complete
返回或引发决定。
另一方面,通过安排恢复任务和暂停它的 await
表达式引发 CancelledError
来取消任务。除非任务专门捕获并抑制此异常(行为良好的 asyncio 代码不应该这样做),否则任务将停止执行并且 CancelledError
将成为其结果。但是,如果调用 cancel()
时协程已经完成,则 cancel()
无法执行任何操作,因为没有待处理的 await
可将 CancelledError
注入。