Python multiprocessing - 捕获信号以重新启动子进程或关闭父进程
Python multiprocessing - Capturing signals to restart child processes or shut down parent process
我正在使用多处理库生成两个子进程。我想确保只要父进程还活着,如果子进程死了(收到 SIGKILL 或 SIGTERM),它们就会自动重启。另一方面,如果父进程收到 SIGTERM/SIGINT,我希望它终止所有子进程然后退出。
我是这样解决问题的:
import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN
from functools import partial
import multiprocessing
import setproctitle
class HelloWorld(multiprocessing.Process):
def __init__(self):
super(HelloWorld, self).__init__()
# ignore, let parent handle it
signal(SIGTERM, SIG_IGN)
def run(self):
setproctitle.setproctitle("helloProcess")
while True:
print "Hello World"
time.sleep(1)
class Counter(multiprocessing.Process):
def __init__(self):
super(Counter, self).__init__()
self.counter = 1
# ignore, let parent handle it
signal(SIGTERM, SIG_IGN)
def run(self):
setproctitle.setproctitle("counterProcess")
while True:
print self.counter
time.sleep(1)
self.counter += 1
def signal_handler(helloProcess, counterProcess, signum, frame):
print multiprocessing.active_children()
print "helloProcess: ", helloProcess
print "counterProcess: ", counterProcess
if signum == 17:
print "helloProcess: ", helloProcess.is_alive()
if not helloProcess.is_alive():
print "Restarting helloProcess"
helloProcess = HelloWorld()
helloProcess.start()
print "counterProcess: ", counterProcess.is_alive()
if not counterProcess.is_alive():
print "Restarting counterProcess"
counterProcess = Counter()
counterProcess.start()
else:
if helloProcess.is_alive():
print "Stopping helloProcess"
helloProcess.terminate()
if counterProcess.is_alive():
print "Stopping counterProcess"
counterProcess.terminate()
sys.exit(0)
if __name__ == '__main__':
helloProcess = HelloWorld()
helloProcess.start()
counterProcess = Counter()
counterProcess.start()
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, partial(signal_handler, helloProcess, counterProcess))
multiprocessing.active_children()
如果我向 counterProcess 发送 SIGKILL,它将正确重启。但是,向 helloProcess 发送 SIGKILL 也会重新启动 counterProcess 而不是 helloProcess?
如果我向父进程发送 SIGTERM,父进程将退出,但子进程成为孤儿并继续。我该如何纠正这种行为?
代码有几个问题,所以我将在后续讨论它们。
If I send a SIGKILL to the counterProcess, it will restart correctly. However, sending a SIGKILL to the helloProcess also restarts the counterProcess instead of the helloProcess?
这种奇怪的行为很可能是由于您的主进程中缺少阻塞调用,因为 multiprocessing.active_children()
并不是真正的阻塞调用。我无法真正解释程序如此行为的确切原因,但在 __main__
函数中添加阻塞调用,例如
while True:
time.sleep(1)
解决了问题。
另一个非常严重的问题是将对象传递给处理程序的方式:
helloProcess = HelloWorld()
...
partial(signal_handler, helloProcess, counterProcess)
这是过时的,考虑到您在其中创建了新对象:
if not helloProcess.is_alive():
print "Restarting helloProcess"
helloProcess = HelloWorld()
helloProcess.start()
请注意,两个对象对 HelloWorld()
对象使用不同的别名。部分对象绑定到 __main__
函数中的别名,而回调中的对象绑定到其局部范围别名。因此,通过将新对象分配给本地范围别名,您实际上并不会影响回调绑定到的对象(它仍然绑定到在 __main__
范围内创建的对象)。
您可以通过在回调范围内以相同的方式将信号回调与新对象重新绑定来修复它:
def signal_handler(...):
...
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, partial(signal_handler, helloProcess, counterProcess))
...
然而,这会导致另一个陷阱,因为现在每个子进程都会从父进程继承回调,并在每次收到信号时访问它。要修复它,您可以在创建子进程之前临时将信号处理程序设置为默认值:
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, SIG_DFL)
最后,您可能希望在终止它们之前抑制来自您的子进程的任何信号,否则它们会再次触发回调:
signal(SIGCHLD, SIG_IGN)
请注意,您可能想重新设计应用程序的体系结构并利用 multiprocessing
提供的一些功能。
最终代码:
import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN, SIG_DFL
from functools import partial
import multiprocessing
#import setproctitle
class HelloWorld(multiprocessing.Process):
def __init__(self):
super(HelloWorld, self).__init__()
# ignore, let parent handle it
#signal(SIGTERM, SIG_IGN)
def run(self):
#setproctitle.setproctitle("helloProcess")
while True:
print "Hello World"
time.sleep(1)
class Counter(multiprocessing.Process):
def __init__(self):
super(Counter, self).__init__()
self.counter = 1
# ignore, let parent handle it
#signal(SIGTERM, SIG_IGN)
def run(self):
#setproctitle.setproctitle("counterProcess")
while True:
print self.counter
time.sleep(1)
self.counter += 1
def signal_handler(helloProcess, counterProcess, signum, frame):
print multiprocessing.active_children()
print "helloProcess: ", helloProcess
print "counterProcess: ", counterProcess
print "current_process: ", multiprocessing.current_process()
if signum == 17:
# Since each new child inherits current signal handler,
# temporarily set it to default before spawning new child.
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, SIG_DFL)
print "helloProcess: ", helloProcess.is_alive()
if not helloProcess.is_alive():
print "Restarting helloProcess"
helloProcess = HelloWorld()
helloProcess.start()
print "counterProcess: ", counterProcess.is_alive()
if not counterProcess.is_alive():
print "Restarting counterProcess"
counterProcess = Counter()
counterProcess.start()
# After new children are spawned, revert to old signal handling policy.
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, partial(signal_handler, helloProcess, counterProcess))
else:
# Ignore any signal that child communicates before quit
signal(SIGCHLD, SIG_IGN)
if helloProcess.is_alive():
print "Stopping helloProcess"
helloProcess.terminate()
if counterProcess.is_alive():
print "Stopping counterProcess"
counterProcess.terminate()
sys.exit(0)
if __name__ == '__main__':
helloProcess = HelloWorld()
helloProcess.start()
counterProcess = Counter()
counterProcess.start()
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, partial(signal_handler, helloProcess, counterProcess))
while True:
print multiprocessing.active_children()
time.sleep(1)
要从 signal.SIGCHLD
处理程序重新创建死的 children,母亲必须调用 os.wait
函数之一,因为 Process.is_alive
在这里不起作用。
虽然可能,但它很复杂,因为 signal.SIGCHLD
是在其中一个 children 状态更改 f.e 时交付给母亲的。 signal.SIGSTOP
、signal.SIGCONT
或任何其他终止信号由 child 接收。
因此 signal.SIGCHLD
处理程序必须区分 child 的这些状态。仅在交付 signal.SIGCHLD
时重新创建 children 可能会创建比必要更多的 children。
以下代码使用 os.waitpid
和 os.WNOHANG
使其成为 non-blocking 并且 os.WUNTRACED
和 os.WCONTINUED
用于学习 signal.SIGCHLD
是否来自signal.SIGSTOP
或 signal.SIGCONT
.
os.waitpid
不起作用,即 returns (0, 0)
如果任何 Process
实例被 print
ed,即 str(Process())
在你调用 os.waitpid
.
import sys
import time
from signal import signal, pause, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_DFL
import multiprocessing
import os
class HelloWorld(multiprocessing.Process):
def run(self):
# reset SIGTERM to default for Process.terminate to work
signal(SIGTERM, SIG_DFL)
while True:
print "Hello World"
time.sleep(1)
class Counter(multiprocessing.Process):
def __init__(self):
super(Counter, self).__init__()
self.counter = 1
def run(self):
# reset SIGTERM to default for Process.terminate to work
signal(SIGTERM, SIG_DFL)
while True:
print self.counter
time.sleep(1)
self.counter += 1
def signal_handler(signum, _):
global helloProcess, counterProcess
if signum == SIGCHLD:
pid, status = os.waitpid(-1, os.WNOHANG|os.WUNTRACED|os.WCONTINUED)
if os.WIFCONTINUED(status) or os.WIFSTOPPED(status):
return
if os.WIFSIGNALED(status) or os.WIFEXITED(status):
if helloProcess.pid == pid:
print("Restarting helloProcess")
helloProcess = HelloWorld()
helloProcess.start()
elif counterProcess.pid == pid:
print("Restarting counterProcess")
counterProcess = Counter()
counterProcess.start()
else:
# mother shouldn't be notified when it terminates children
signal(SIGCHLD, SIG_DFL)
if helloProcess.is_alive():
print("Stopping helloProcess")
helloProcess.terminate()
if counterProcess.is_alive():
print("Stopping counterProcess")
counterProcess.terminate()
sys.exit(0)
if __name__ == '__main__':
helloProcess = HelloWorld()
helloProcess.start()
counterProcess = Counter()
counterProcess.start()
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, signal_handler)
while True:
pause()
以下代码在不使用 signal.SIGCHLD
的情况下重新创建死 children。所以它比前者更简单。
创建了两个 children 后,母进程为 SIGINT、SIGTERM、SIGQUIT 设置了一个名为 term_child
的信号处理程序。 term_child
在调用时终止并加入每个 child。
母进程不断检查 children 是否存活,并在必要时在 while
循环中重新创建它们。
因为每个 child 都从 mother 继承了信号处理程序,SIGINT
处理程序应该重置为默认值 Process.terminate
才能工作
import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT
import multiprocessing
class HelloWorld(multiprocessing.Process):
def run(self):
signal(SIGTERM, SIG_DFL)
while True:
print "Hello World"
time.sleep(1)
class Counter(multiprocessing.Process):
def __init__(self):
super(Counter, self).__init__()
self.counter = 1
def run(self):
signal(SIGTERM, SIG_DFL)
while True:
print self.counter
time.sleep(1)
self.counter += 1
def term_child(_, __):
for child in children:
child.terminate()
child.join()
sys.exit(0)
if __name__ == '__main__':
children = [HelloWorld(), Counter()]
for child in children:
child.start()
for signame in (SIGINT, SIGTERM, SIGQUIT):
signal(signame, term_child)
while True:
for i, child in enumerate(children):
if not child.is_alive():
children[i] = type(child)()
children[i].start()
time.sleep(1)
我正在使用多处理库生成两个子进程。我想确保只要父进程还活着,如果子进程死了(收到 SIGKILL 或 SIGTERM),它们就会自动重启。另一方面,如果父进程收到 SIGTERM/SIGINT,我希望它终止所有子进程然后退出。
我是这样解决问题的:
import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN
from functools import partial
import multiprocessing
import setproctitle
class HelloWorld(multiprocessing.Process):
def __init__(self):
super(HelloWorld, self).__init__()
# ignore, let parent handle it
signal(SIGTERM, SIG_IGN)
def run(self):
setproctitle.setproctitle("helloProcess")
while True:
print "Hello World"
time.sleep(1)
class Counter(multiprocessing.Process):
def __init__(self):
super(Counter, self).__init__()
self.counter = 1
# ignore, let parent handle it
signal(SIGTERM, SIG_IGN)
def run(self):
setproctitle.setproctitle("counterProcess")
while True:
print self.counter
time.sleep(1)
self.counter += 1
def signal_handler(helloProcess, counterProcess, signum, frame):
print multiprocessing.active_children()
print "helloProcess: ", helloProcess
print "counterProcess: ", counterProcess
if signum == 17:
print "helloProcess: ", helloProcess.is_alive()
if not helloProcess.is_alive():
print "Restarting helloProcess"
helloProcess = HelloWorld()
helloProcess.start()
print "counterProcess: ", counterProcess.is_alive()
if not counterProcess.is_alive():
print "Restarting counterProcess"
counterProcess = Counter()
counterProcess.start()
else:
if helloProcess.is_alive():
print "Stopping helloProcess"
helloProcess.terminate()
if counterProcess.is_alive():
print "Stopping counterProcess"
counterProcess.terminate()
sys.exit(0)
if __name__ == '__main__':
helloProcess = HelloWorld()
helloProcess.start()
counterProcess = Counter()
counterProcess.start()
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, partial(signal_handler, helloProcess, counterProcess))
multiprocessing.active_children()
如果我向 counterProcess 发送 SIGKILL,它将正确重启。但是,向 helloProcess 发送 SIGKILL 也会重新启动 counterProcess 而不是 helloProcess?
如果我向父进程发送 SIGTERM,父进程将退出,但子进程成为孤儿并继续。我该如何纠正这种行为?
代码有几个问题,所以我将在后续讨论它们。
If I send a SIGKILL to the counterProcess, it will restart correctly. However, sending a SIGKILL to the helloProcess also restarts the counterProcess instead of the helloProcess?
这种奇怪的行为很可能是由于您的主进程中缺少阻塞调用,因为 multiprocessing.active_children()
并不是真正的阻塞调用。我无法真正解释程序如此行为的确切原因,但在 __main__
函数中添加阻塞调用,例如
while True:
time.sleep(1)
解决了问题。
另一个非常严重的问题是将对象传递给处理程序的方式:
helloProcess = HelloWorld()
...
partial(signal_handler, helloProcess, counterProcess)
这是过时的,考虑到您在其中创建了新对象:
if not helloProcess.is_alive():
print "Restarting helloProcess"
helloProcess = HelloWorld()
helloProcess.start()
请注意,两个对象对 HelloWorld()
对象使用不同的别名。部分对象绑定到 __main__
函数中的别名,而回调中的对象绑定到其局部范围别名。因此,通过将新对象分配给本地范围别名,您实际上并不会影响回调绑定到的对象(它仍然绑定到在 __main__
范围内创建的对象)。
您可以通过在回调范围内以相同的方式将信号回调与新对象重新绑定来修复它:
def signal_handler(...):
...
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, partial(signal_handler, helloProcess, counterProcess))
...
然而,这会导致另一个陷阱,因为现在每个子进程都会从父进程继承回调,并在每次收到信号时访问它。要修复它,您可以在创建子进程之前临时将信号处理程序设置为默认值:
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, SIG_DFL)
最后,您可能希望在终止它们之前抑制来自您的子进程的任何信号,否则它们会再次触发回调:
signal(SIGCHLD, SIG_IGN)
请注意,您可能想重新设计应用程序的体系结构并利用 multiprocessing
提供的一些功能。
最终代码:
import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN, SIG_DFL
from functools import partial
import multiprocessing
#import setproctitle
class HelloWorld(multiprocessing.Process):
def __init__(self):
super(HelloWorld, self).__init__()
# ignore, let parent handle it
#signal(SIGTERM, SIG_IGN)
def run(self):
#setproctitle.setproctitle("helloProcess")
while True:
print "Hello World"
time.sleep(1)
class Counter(multiprocessing.Process):
def __init__(self):
super(Counter, self).__init__()
self.counter = 1
# ignore, let parent handle it
#signal(SIGTERM, SIG_IGN)
def run(self):
#setproctitle.setproctitle("counterProcess")
while True:
print self.counter
time.sleep(1)
self.counter += 1
def signal_handler(helloProcess, counterProcess, signum, frame):
print multiprocessing.active_children()
print "helloProcess: ", helloProcess
print "counterProcess: ", counterProcess
print "current_process: ", multiprocessing.current_process()
if signum == 17:
# Since each new child inherits current signal handler,
# temporarily set it to default before spawning new child.
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, SIG_DFL)
print "helloProcess: ", helloProcess.is_alive()
if not helloProcess.is_alive():
print "Restarting helloProcess"
helloProcess = HelloWorld()
helloProcess.start()
print "counterProcess: ", counterProcess.is_alive()
if not counterProcess.is_alive():
print "Restarting counterProcess"
counterProcess = Counter()
counterProcess.start()
# After new children are spawned, revert to old signal handling policy.
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, partial(signal_handler, helloProcess, counterProcess))
else:
# Ignore any signal that child communicates before quit
signal(SIGCHLD, SIG_IGN)
if helloProcess.is_alive():
print "Stopping helloProcess"
helloProcess.terminate()
if counterProcess.is_alive():
print "Stopping counterProcess"
counterProcess.terminate()
sys.exit(0)
if __name__ == '__main__':
helloProcess = HelloWorld()
helloProcess.start()
counterProcess = Counter()
counterProcess.start()
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, partial(signal_handler, helloProcess, counterProcess))
while True:
print multiprocessing.active_children()
time.sleep(1)
要从 signal.SIGCHLD
处理程序重新创建死的 children,母亲必须调用 os.wait
函数之一,因为 Process.is_alive
在这里不起作用。
虽然可能,但它很复杂,因为 signal.SIGCHLD
是在其中一个 children 状态更改 f.e 时交付给母亲的。 signal.SIGSTOP
、signal.SIGCONT
或任何其他终止信号由 child 接收。
因此 signal.SIGCHLD
处理程序必须区分 child 的这些状态。仅在交付 signal.SIGCHLD
时重新创建 children 可能会创建比必要更多的 children。
以下代码使用 os.waitpid
和 os.WNOHANG
使其成为 non-blocking 并且 os.WUNTRACED
和 os.WCONTINUED
用于学习 signal.SIGCHLD
是否来自signal.SIGSTOP
或 signal.SIGCONT
.
os.waitpid
不起作用,即 returns (0, 0)
如果任何 Process
实例被 print
ed,即 str(Process())
在你调用 os.waitpid
.
import sys
import time
from signal import signal, pause, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_DFL
import multiprocessing
import os
class HelloWorld(multiprocessing.Process):
def run(self):
# reset SIGTERM to default for Process.terminate to work
signal(SIGTERM, SIG_DFL)
while True:
print "Hello World"
time.sleep(1)
class Counter(multiprocessing.Process):
def __init__(self):
super(Counter, self).__init__()
self.counter = 1
def run(self):
# reset SIGTERM to default for Process.terminate to work
signal(SIGTERM, SIG_DFL)
while True:
print self.counter
time.sleep(1)
self.counter += 1
def signal_handler(signum, _):
global helloProcess, counterProcess
if signum == SIGCHLD:
pid, status = os.waitpid(-1, os.WNOHANG|os.WUNTRACED|os.WCONTINUED)
if os.WIFCONTINUED(status) or os.WIFSTOPPED(status):
return
if os.WIFSIGNALED(status) or os.WIFEXITED(status):
if helloProcess.pid == pid:
print("Restarting helloProcess")
helloProcess = HelloWorld()
helloProcess.start()
elif counterProcess.pid == pid:
print("Restarting counterProcess")
counterProcess = Counter()
counterProcess.start()
else:
# mother shouldn't be notified when it terminates children
signal(SIGCHLD, SIG_DFL)
if helloProcess.is_alive():
print("Stopping helloProcess")
helloProcess.terminate()
if counterProcess.is_alive():
print("Stopping counterProcess")
counterProcess.terminate()
sys.exit(0)
if __name__ == '__main__':
helloProcess = HelloWorld()
helloProcess.start()
counterProcess = Counter()
counterProcess.start()
for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
signal(signame, signal_handler)
while True:
pause()
以下代码在不使用 signal.SIGCHLD
的情况下重新创建死 children。所以它比前者更简单。
创建了两个 children 后,母进程为 SIGINT、SIGTERM、SIGQUIT 设置了一个名为 term_child
的信号处理程序。 term_child
在调用时终止并加入每个 child。
母进程不断检查 children 是否存活,并在必要时在 while
循环中重新创建它们。
因为每个 child 都从 mother 继承了信号处理程序,SIGINT
处理程序应该重置为默认值 Process.terminate
才能工作
import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT
import multiprocessing
class HelloWorld(multiprocessing.Process):
def run(self):
signal(SIGTERM, SIG_DFL)
while True:
print "Hello World"
time.sleep(1)
class Counter(multiprocessing.Process):
def __init__(self):
super(Counter, self).__init__()
self.counter = 1
def run(self):
signal(SIGTERM, SIG_DFL)
while True:
print self.counter
time.sleep(1)
self.counter += 1
def term_child(_, __):
for child in children:
child.terminate()
child.join()
sys.exit(0)
if __name__ == '__main__':
children = [HelloWorld(), Counter()]
for child in children:
child.start()
for signame in (SIGINT, SIGTERM, SIGQUIT):
signal(signame, term_child)
while True:
for i, child in enumerate(children):
if not child.is_alive():
children[i] = type(child)()
children[i].start()
time.sleep(1)