run_until_complete 即使在循环停止后也会失败
run_until_complete fails even after loop has been stopped
我正在尝试编写一个 SIGTERM 处理程序,它将有我的 run_forever()-loop
- 停止接受新任务。
- 完成 运行ning 任务。
- 关机。
这是我写的学习演示:
import asyncio
import signal
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
_log = logging.getLogger(__name__)
class Looper:
def __init__(self, loop):
self._loop = loop
self._shutdown = False
signal.signal(signal.SIGINT, self._exit)
signal.signal(signal.SIGTERM, self._exit)
def _exit(self, sig, frame):
name = signal.Signals(sig).name
_log.info(f"Received shutdown-signal: {sig} ({name})")
self._shutdown = True
self._loop.stop() # << Stopping the event loop here.
_log.info(f"Loop stop initiated.")
pending = asyncio.all_tasks(loop=self._loop)
_log.info(f"Collected {len(pending)} tasks that have been stopped.")
if pending:
_log.info("Attempting to gather pending tasks: " + str(pending))
gatherer_set = asyncio.gather(*pending, loop=self._loop)
# self._loop.run_until_complete(gatherer_set) # << "RuntimeError: This event loop is already running"
_log.info("Shutting down for good.")
async def thumper(self, id, t):
print(f"{id}: Winding up...")
while not self._shutdown:
await asyncio.sleep(t)
print(f'{id}: Thump!')
print(f'{id}: Thud.')
loop = asyncio.get_event_loop()
lp = Looper(loop)
loop.create_task(lp.thumper('North Hall', 2))
loop.create_task(lp.thumper('South Hall', 3))
loop.run_forever()
_log.info("Done.")
在 Windows 10 和 Debian 10 上面的脚本都对 SIGINT 做出反应并产生输出
North Hall: Winding up...
South Hall: Winding up...
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
09:55:53 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
09:55:53 INFO [__main__]: Loop stop initiated.
09:55:53 INFO [__main__]: Collected 2 tasks that have been stopped.
09:55:53 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91BF0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91C10>()]>>}
09:55:53 INFO [__main__]: Shutting down for good.
09:55:53 INFO [__main__]: Done.
遗憾的是,"Thud." 行表示 thumper(..) 演示调用实际上
结论,不会显示。我想,这是因为 "gather" 刚好让我得到了一套
未实现的期货。但是,如果我敢激活 run_until_complete()-
行,即使它出现在 后面 self._loop.stop(),输出
结尾如下:
[...]
10:24:25 INFO [__main__]: Collected 2 tasks that have been stopped.
10:24:25 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E417D0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E41BF0>()]>>}
Traceback (most recent call last):
File "amazing_grace.py", line 50, in <module>
loop.run_forever()
File "C:\Python37\lib\asyncio\base_events.py", line 539, in run_forever
self._run_once()
File "C:\Python37\lib\asyncio\base_events.py", line 1739, in _run_once
event_list = self._selector.select(timeout)
File "C:\Python37\lib\selectors.py", line 323, in select
r, w, _ = self._select(self._readers, self._writers, [], timeout)
File "C:\Python37\lib\selectors.py", line 314, in _select
r, w, x = select.select(r, w, w, timeout)
File "amazing_grace.py", line 35, in _exit
self._loop.run_until_complete(gatherer_set) # << "This event loop is already running"
File "C:\Python37\lib\asyncio\base_events.py", line 571, in run_until_complete
self.run_forever()
File "C:\Python37\lib\asyncio\base_events.py", line 526, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
问题归结为
- 如何在这种情况下调用或替换 run_until_complete(..),以及
- 为什么我在停止循环后看到这个 "Loop is running"-错误。
程序应该 运行 在 Python 3.7,在 Windows 10 和Linux.
几天后编辑
正如 zaquest 在 his/her 回答中所说的那样,如果只是分配一个信号处理程序并在其中添加一个 create_task
调用,就会遇到麻烦;据我观察,该例程可能会或可能不会 运行(即使没有其他任务)。所以现在我加了一个sys.platform
检查UNIX下的脚本运行s()。如果是这样,我更喜欢更可靠的 loop.add_signal_handler
来定义回调函数,这正是我真正需要的。幸运的是,UNIX 是我的主要用例。主线:
self._loop.add_signal_handler(signal.signal(signal.SIGINT, self._exit, signal.SIGINT, None)
为什么要检查平台?:按照文档,https://docs.python.org/3/library/asyncio-eventloop.html#unix-signals,loop.add_signal_handler() 在 Windows 上不可用,这并不是真正令人惊讶的想法所讨论的信号是 UNIX 术语。
Python 信号处理程序在 main thread, in the same thread in which your loop is running. BaseEventLoop.stop()
method does not immediately stops the loop, instead it just sets a flag, so that when your loop runs next time it only executes the callbacks that has already been scheduled, and does not schedule any more callbacks (see run_forever) 中执行。但是,在您的信号处理程序 returns 之前,循环不能 运行。这意味着您不能等到循环在信号处理程序中停止。相反,你可以安排另一个任务,等待你的长 运行ning 任务对 self._shutdown
中的变化做出反应,然后停止循环。
class Looper:
...
def _exit(self, sig, frame):
name = signal.Signals(sig).name
_log.info("Received shutdown-signal: %s (%s)", sig, name)
self._shutdown = True
pending = asyncio.all_tasks(loop=self._loop)
_log.info("Attempting to gather pending tasks: " + str(pending))
if pending:
self._loop.create_task(self._wait_for_stop(pending))
async def _wait_for_stop(self, tasks):
await asyncio.gather(*tasks)
self._loop.stop() # << Stopping the event loop here.
_log.info("Loop stop initiated.")
...
还有一件事要提到的是,文档说 signal.signal()
处理程序 not allowed
与循环交互,但没有说明原因 (see)
找到一个可以从异步函数调用 self._loop.stop() 的解决方案
这将首先等待所有其他任务。请注意,它不会等待自己!
如果尝试,程序将锁定。
此外,asyncio.wait_for(..) 协程允许超时。
import asyncio
import signal
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
_log = logging.getLogger(__name__)
class Looper:
def __init__(self, loop):
self._loop = loop
self._shutdown = False
signal.signal(signal.SIGINT, self._exit)
signal.signal(signal.SIGTERM, self._exit)
async def _a_exit(self):
self._shutdown = True
my_task = asyncio.current_task()
pending = list(filter(lambda x: x is not my_task, asyncio.all_tasks(loop=self._loop)))
waiters = [asyncio.wait_for(p, timeout = 1.5, loop=self._loop) for p in pending]
results = await asyncio.gather(*waiters, loop=self._loop, return_exceptions=True)
n_failure = len(list(filter(lambda x: isinstance(x, Exception), results)))
_log.info(f"{n_failure} failed processes when quick-gathering the remaining {len(results)} tasks. Stopping loop now.")
self._loop.stop()
def _exit(self, sig, frame):
name = signal.Signals(sig).name
_log.info(f"Received shutdown-signal: {sig} ({name})")
self._loop.create_task(self._a_exit())
async def thumper(self, id, t):
print(f"{id}: Winding up...")
while not self._shutdown:
await asyncio.sleep(t)
print(f'{id}: Thump!')
print(f'{id}: Thud.')
loop = asyncio.get_event_loop()
lp = Looper(loop)
loop.create_task(lp.thumper('North Hall', 1))
loop.create_task(lp.thumper('South Hall', 2))
loop.create_task(lp.thumper(' West Hall', 3))
loop.create_task(lp.thumper(' East Hall', 4))
loop.run_forever()
_log.info("Done.")
On Windows 10 这可能导致输出
North Hall: Winding up...
South Hall: Winding up...
West Hall: Winding up...
East Hall: Winding up...
North Hall: Thump!
South Hall: Thump!
[..]
South Hall: Thump!
North Hall: Thump!
14:20:59 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
West Hall: Thump!
West Hall: Thud.
North Hall: Thump!
North Hall: Thud.
South Hall: Thump!
South Hall: Thud.
14:21:01 INFO [__main__]: 1 failed processes when quick-gathering the remaining 4 tasks. Stopping loop now.
14:21:01 INFO [__main__]: Done.
失败的进程因超时而失败。
请注意,这解决了我的问题。但是,为什么 loop.run_until_complete(..) 在 loop.stop() 被调用后失败的问题仍然悬而未决.
我正在尝试编写一个 SIGTERM 处理程序,它将有我的 run_forever()-loop
- 停止接受新任务。
- 完成 运行ning 任务。
- 关机。
这是我写的学习演示:
import asyncio
import signal
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
_log = logging.getLogger(__name__)
class Looper:
def __init__(self, loop):
self._loop = loop
self._shutdown = False
signal.signal(signal.SIGINT, self._exit)
signal.signal(signal.SIGTERM, self._exit)
def _exit(self, sig, frame):
name = signal.Signals(sig).name
_log.info(f"Received shutdown-signal: {sig} ({name})")
self._shutdown = True
self._loop.stop() # << Stopping the event loop here.
_log.info(f"Loop stop initiated.")
pending = asyncio.all_tasks(loop=self._loop)
_log.info(f"Collected {len(pending)} tasks that have been stopped.")
if pending:
_log.info("Attempting to gather pending tasks: " + str(pending))
gatherer_set = asyncio.gather(*pending, loop=self._loop)
# self._loop.run_until_complete(gatherer_set) # << "RuntimeError: This event loop is already running"
_log.info("Shutting down for good.")
async def thumper(self, id, t):
print(f"{id}: Winding up...")
while not self._shutdown:
await asyncio.sleep(t)
print(f'{id}: Thump!')
print(f'{id}: Thud.')
loop = asyncio.get_event_loop()
lp = Looper(loop)
loop.create_task(lp.thumper('North Hall', 2))
loop.create_task(lp.thumper('South Hall', 3))
loop.run_forever()
_log.info("Done.")
在 Windows 10 和 Debian 10 上面的脚本都对 SIGINT 做出反应并产生输出
North Hall: Winding up...
South Hall: Winding up...
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
09:55:53 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
09:55:53 INFO [__main__]: Loop stop initiated.
09:55:53 INFO [__main__]: Collected 2 tasks that have been stopped.
09:55:53 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91BF0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91C10>()]>>}
09:55:53 INFO [__main__]: Shutting down for good.
09:55:53 INFO [__main__]: Done.
遗憾的是,"Thud." 行表示 thumper(..) 演示调用实际上 结论,不会显示。我想,这是因为 "gather" 刚好让我得到了一套 未实现的期货。但是,如果我敢激活 run_until_complete()- 行,即使它出现在 后面 self._loop.stop(),输出 结尾如下:
[...]
10:24:25 INFO [__main__]: Collected 2 tasks that have been stopped.
10:24:25 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E417D0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E41BF0>()]>>}
Traceback (most recent call last):
File "amazing_grace.py", line 50, in <module>
loop.run_forever()
File "C:\Python37\lib\asyncio\base_events.py", line 539, in run_forever
self._run_once()
File "C:\Python37\lib\asyncio\base_events.py", line 1739, in _run_once
event_list = self._selector.select(timeout)
File "C:\Python37\lib\selectors.py", line 323, in select
r, w, _ = self._select(self._readers, self._writers, [], timeout)
File "C:\Python37\lib\selectors.py", line 314, in _select
r, w, x = select.select(r, w, w, timeout)
File "amazing_grace.py", line 35, in _exit
self._loop.run_until_complete(gatherer_set) # << "This event loop is already running"
File "C:\Python37\lib\asyncio\base_events.py", line 571, in run_until_complete
self.run_forever()
File "C:\Python37\lib\asyncio\base_events.py", line 526, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
问题归结为
- 如何在这种情况下调用或替换 run_until_complete(..),以及
- 为什么我在停止循环后看到这个 "Loop is running"-错误。
程序应该 运行 在 Python 3.7,在 Windows 10 和Linux.
几天后编辑
正如 zaquest 在 his/her 回答中所说的那样,如果只是分配一个信号处理程序并在其中添加一个 create_task
调用,就会遇到麻烦;据我观察,该例程可能会或可能不会 运行(即使没有其他任务)。所以现在我加了一个sys.platform
检查UNIX下的脚本运行s()。如果是这样,我更喜欢更可靠的 loop.add_signal_handler
来定义回调函数,这正是我真正需要的。幸运的是,UNIX 是我的主要用例。主线:
self._loop.add_signal_handler(signal.signal(signal.SIGINT, self._exit, signal.SIGINT, None)
为什么要检查平台?:按照文档,https://docs.python.org/3/library/asyncio-eventloop.html#unix-signals,loop.add_signal_handler() 在 Windows 上不可用,这并不是真正令人惊讶的想法所讨论的信号是 UNIX 术语。
Python 信号处理程序在 main thread, in the same thread in which your loop is running. BaseEventLoop.stop()
method does not immediately stops the loop, instead it just sets a flag, so that when your loop runs next time it only executes the callbacks that has already been scheduled, and does not schedule any more callbacks (see run_forever) 中执行。但是,在您的信号处理程序 returns 之前,循环不能 运行。这意味着您不能等到循环在信号处理程序中停止。相反,你可以安排另一个任务,等待你的长 运行ning 任务对 self._shutdown
中的变化做出反应,然后停止循环。
class Looper:
...
def _exit(self, sig, frame):
name = signal.Signals(sig).name
_log.info("Received shutdown-signal: %s (%s)", sig, name)
self._shutdown = True
pending = asyncio.all_tasks(loop=self._loop)
_log.info("Attempting to gather pending tasks: " + str(pending))
if pending:
self._loop.create_task(self._wait_for_stop(pending))
async def _wait_for_stop(self, tasks):
await asyncio.gather(*tasks)
self._loop.stop() # << Stopping the event loop here.
_log.info("Loop stop initiated.")
...
还有一件事要提到的是,文档说 signal.signal()
处理程序 not allowed
与循环交互,但没有说明原因 (see)
找到一个可以从异步函数调用 self._loop.stop() 的解决方案 这将首先等待所有其他任务。请注意,它不会等待自己! 如果尝试,程序将锁定。
此外,asyncio.wait_for(..) 协程允许超时。
import asyncio
import signal
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
_log = logging.getLogger(__name__)
class Looper:
def __init__(self, loop):
self._loop = loop
self._shutdown = False
signal.signal(signal.SIGINT, self._exit)
signal.signal(signal.SIGTERM, self._exit)
async def _a_exit(self):
self._shutdown = True
my_task = asyncio.current_task()
pending = list(filter(lambda x: x is not my_task, asyncio.all_tasks(loop=self._loop)))
waiters = [asyncio.wait_for(p, timeout = 1.5, loop=self._loop) for p in pending]
results = await asyncio.gather(*waiters, loop=self._loop, return_exceptions=True)
n_failure = len(list(filter(lambda x: isinstance(x, Exception), results)))
_log.info(f"{n_failure} failed processes when quick-gathering the remaining {len(results)} tasks. Stopping loop now.")
self._loop.stop()
def _exit(self, sig, frame):
name = signal.Signals(sig).name
_log.info(f"Received shutdown-signal: {sig} ({name})")
self._loop.create_task(self._a_exit())
async def thumper(self, id, t):
print(f"{id}: Winding up...")
while not self._shutdown:
await asyncio.sleep(t)
print(f'{id}: Thump!')
print(f'{id}: Thud.')
loop = asyncio.get_event_loop()
lp = Looper(loop)
loop.create_task(lp.thumper('North Hall', 1))
loop.create_task(lp.thumper('South Hall', 2))
loop.create_task(lp.thumper(' West Hall', 3))
loop.create_task(lp.thumper(' East Hall', 4))
loop.run_forever()
_log.info("Done.")
On Windows 10 这可能导致输出
North Hall: Winding up...
South Hall: Winding up...
West Hall: Winding up...
East Hall: Winding up...
North Hall: Thump!
South Hall: Thump!
[..]
South Hall: Thump!
North Hall: Thump!
14:20:59 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
West Hall: Thump!
West Hall: Thud.
North Hall: Thump!
North Hall: Thud.
South Hall: Thump!
South Hall: Thud.
14:21:01 INFO [__main__]: 1 failed processes when quick-gathering the remaining 4 tasks. Stopping loop now.
14:21:01 INFO [__main__]: Done.
失败的进程因超时而失败。
请注意,这解决了我的问题。但是,为什么 loop.run_until_complete(..) 在 loop.stop() 被调用后失败的问题仍然悬而未决.