如何使用 ProcessPoolExecutor 优雅地终止 loop.run_in_executor?
How to terminate loop.run_in_executor with ProcessPoolExecutor gracefully?
如何优雅地用 ProcessPoolExecutor
终止 loop.run_in_executor
?启动程序后不久,发送 SIGINT (ctrl + c)。
def blocking_task():
sleep(3)
async def main():
exe = concurrent.futures.ProcessPoolExecutor(max_workers=4)
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(exe, blocking_task) for i in range(3)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('ctrl + c')
如果 max_workers
等于或小于任务数,一切正常。但是如果max_workers
更大,上面代码的输出结果如下:
Process ForkProcess-4:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 97, in get
res = self._recv_bytes()
File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
ctrl + c
我只想捕获异常 (KeyboardInterrupt) 一次并忽略或忽略进程池中的其他异常,但是如何?
更新额外学分:
- 你能解释一下多重异常的(原因)吗?
- 添加信号处理程序是否适用于 Windows?
- 如果没有,是否有没有信号处理程序的解决方案?
您可以使用 ProcessPoolExecutor
的 initializer
parameter 在每个进程中安装 SIGINT
的处理程序。
更新:
在 Unix 上,当进程被创建时,它就成为其父进程组的成员。如果您使用 Ctrl+C
生成 SIGINT
,则信号将发送到整个进程组。
import asyncio
import concurrent.futures
import os
import signal
import sys
from time import sleep
def handler(signum, frame):
print('SIGINT for PID=', os.getpid())
sys.exit(0)
def init():
signal.signal(signal.SIGINT, handler)
def blocking_task():
sleep(15)
async def main():
exe = concurrent.futures.ProcessPoolExecutor(max_workers=5, initializer=init)
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(exe, blocking_task) for i in range(2)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('ctrl + c')
Ctrl-C
开始后不久:
^CSIGINT for PID= 59942
SIGINT for PID= 59943
SIGINT for PID= 59941
SIGINT for PID= 59945
SIGINT for PID= 59944
ctrl + c
如何优雅地用 ProcessPoolExecutor
终止 loop.run_in_executor
?启动程序后不久,发送 SIGINT (ctrl + c)。
def blocking_task():
sleep(3)
async def main():
exe = concurrent.futures.ProcessPoolExecutor(max_workers=4)
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(exe, blocking_task) for i in range(3)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('ctrl + c')
如果 max_workers
等于或小于任务数,一切正常。但是如果max_workers
更大,上面代码的输出结果如下:
Process ForkProcess-4:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 97, in get
res = self._recv_bytes()
File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
ctrl + c
我只想捕获异常 (KeyboardInterrupt) 一次并忽略或忽略进程池中的其他异常,但是如何?
更新额外学分:
- 你能解释一下多重异常的(原因)吗?
- 添加信号处理程序是否适用于 Windows?
- 如果没有,是否有没有信号处理程序的解决方案?
您可以使用 ProcessPoolExecutor
的 initializer
parameter 在每个进程中安装 SIGINT
的处理程序。
更新:
在 Unix 上,当进程被创建时,它就成为其父进程组的成员。如果您使用 Ctrl+C
生成 SIGINT
,则信号将发送到整个进程组。
import asyncio
import concurrent.futures
import os
import signal
import sys
from time import sleep
def handler(signum, frame):
print('SIGINT for PID=', os.getpid())
sys.exit(0)
def init():
signal.signal(signal.SIGINT, handler)
def blocking_task():
sleep(15)
async def main():
exe = concurrent.futures.ProcessPoolExecutor(max_workers=5, initializer=init)
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(exe, blocking_task) for i in range(2)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('ctrl + c')
Ctrl-C
开始后不久:
^CSIGINT for PID= 59942
SIGINT for PID= 59943
SIGINT for PID= 59941
SIGINT for PID= 59945
SIGINT for PID= 59944
ctrl + c