Python multiprocessing - 捕获信号以重新启动子进程或关闭父进程

Python multiprocessing - Capturing signals to restart child processes or shut down parent process

我正在使用多处理库生成两个子进程。我想确保只要父进程还活着,如果子进程死了(收到 SIGKILL 或 SIGTERM),它们就会自动重启。另一方面,如果父进程收到 SIGTERM/SIGINT,我希望它终止所有子进程然后退出。

我是这样解决问题的:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN
from functools import partial
import multiprocessing
import setproctitle

class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("helloProcess")

        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()

        self.counter = 1

        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)

    def run(self):

        setproctitle.setproctitle("counterProcess")

        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(helloProcess, counterProcess, signum, frame):

    print multiprocessing.active_children()
    print "helloProcess: ", helloProcess
    print "counterProcess: ", counterProcess

    if signum == 17:

        print "helloProcess: ", helloProcess.is_alive()

        if not helloProcess.is_alive():
            print "Restarting helloProcess"

            helloProcess = HelloWorld()
            helloProcess.start()

        print "counterProcess: ", counterProcess.is_alive()

        if not counterProcess.is_alive():
            print "Restarting counterProcess"

            counterProcess = Counter()
            counterProcess.start()

    else:

        if helloProcess.is_alive():
            print "Stopping helloProcess"
            helloProcess.terminate()

        if counterProcess.is_alive():
            print "Stopping counterProcess"
            counterProcess.terminate()

        sys.exit(0)



if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))

    multiprocessing.active_children()

如果我向 counterProcess 发送 SIGKILL,它将正确重启。但是,向 helloProcess 发送 SIGKILL 也会重新启动 counterProcess 而不是 helloProcess?

如果我向父进程发送 SIGTERM,父进程将退出,但子进程成为孤儿并继续。我该如何纠正这种行为?

代码有几个问题,所以我将在后续讨论它们。

If I send a SIGKILL to the counterProcess, it will restart correctly. However, sending a SIGKILL to the helloProcess also restarts the counterProcess instead of the helloProcess?

这种奇怪的行为很可能是由于您的主进程中缺少阻塞调用,因为 multiprocessing.active_children() 并不是真正的阻塞调用。我无法真正解释程序如此行为的确切原因,但在 __main__ 函数中添加阻塞调用,例如

while True:
    time.sleep(1)

解决了问题。

另一个非常严重的问题是将对象传递给处理程序的方式:

helloProcess = HelloWorld()
...
partial(signal_handler, helloProcess, counterProcess)

这是过时的,考虑到您在其中创建了新对象:

if not helloProcess.is_alive():
    print "Restarting helloProcess"

    helloProcess = HelloWorld()
    helloProcess.start()

请注意,两个对象对 HelloWorld() 对象使用不同的别名。部分对象绑定到 __main__ 函数中的别名,而回调中的对象绑定到其局部范围别名。因此,通过将新对象分配给本地范围别名,您实际上并不会影响回调绑定到的对象(它仍然绑定到在 __main__ 范围内创建的对象)。

您可以通过在回调范围内以相同的方式将信号回调与新对象重新绑定来修复它:

def signal_handler(...):
    ...
    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))
    ...

然而,这会导致另一个陷阱,因为现在每个子进程都会从父进程继承回调,并在每次收到信号时访问它。要修复它,您可以在创建子进程之前临时将信号处理程序设置为默认值:

for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
    signal(signame, SIG_DFL)

最后,您可能希望在终止它们之前抑制来自您的子进程的任何信号,否则它们会再次触发回调:

signal(SIGCHLD, SIG_IGN)

请注意,您可能想重新设计应用程序的体系结构并利用 multiprocessing 提供的一些功能。

最终代码:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN, SIG_DFL
from functools import partial
import multiprocessing
#import setproctitle

class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()

        # ignore, let parent handle it
        #signal(SIGTERM, SIG_IGN)

    def run(self):

        #setproctitle.setproctitle("helloProcess")

        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()

        self.counter = 1

        # ignore, let parent handle it
        #signal(SIGTERM, SIG_IGN)

    def run(self):

        #setproctitle.setproctitle("counterProcess")

        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(helloProcess, counterProcess, signum, frame):

    print multiprocessing.active_children()
    print "helloProcess: ", helloProcess
    print "counterProcess: ", counterProcess

    print "current_process: ", multiprocessing.current_process()

    if signum == 17:

        # Since each new child inherits current signal handler,
        # temporarily set it to default before spawning new child.
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, SIG_DFL)

        print "helloProcess: ", helloProcess.is_alive()

        if not helloProcess.is_alive():
            print "Restarting helloProcess"

            helloProcess = HelloWorld()
            helloProcess.start()

        print "counterProcess: ", counterProcess.is_alive()

        if not counterProcess.is_alive():
            print "Restarting counterProcess"

            counterProcess = Counter()
            counterProcess.start()

        # After new children are spawned, revert to old signal handling policy.
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, partial(signal_handler, helloProcess, counterProcess))


    else:

        # Ignore any signal that child communicates before quit   
        signal(SIGCHLD, SIG_IGN) 

        if helloProcess.is_alive():
            print "Stopping helloProcess"
            helloProcess.terminate()

        if counterProcess.is_alive():
            print "Stopping counterProcess"
            counterProcess.terminate()

        sys.exit(0)



if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))

    while True:
        print multiprocessing.active_children()
        time.sleep(1)

要从 signal.SIGCHLD 处理程序重新创建死的 children,母亲必须调用 os.wait 函数之一,因为 Process.is_alive 在这里不起作用。
虽然可能,但它很复杂,因为 signal.SIGCHLD 是在其中一个 children 状态更改 f.e 时交付给母亲的。 signal.SIGSTOPsignal.SIGCONT 或任何其他终止信号由 child 接收。
因此 signal.SIGCHLD 处理程序必须区分 child 的这些状态。仅在交付 signal.SIGCHLD 时重新创建 children 可能会创建比必要更多的 children。

以下代码使用 os.waitpidos.WNOHANG 使其成为 non-blocking 并且 os.WUNTRACEDos.WCONTINUED 用于学习 signal.SIGCHLD 是否来自signal.SIGSTOPsignal.SIGCONT.
os.waitpid 不起作用,即 returns (0, 0) 如果任何 Process 实例被 printed,即 str(Process()) 在你调用 os.waitpid.

import sys
import time
from signal import signal, pause, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_DFL
import multiprocessing
import os

class HelloWorld(multiprocessing.Process):
    def run(self):
        # reset SIGTERM to default for Process.terminate to work
        signal(SIGTERM, SIG_DFL)
        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1

    def run(self):
        # reset SIGTERM to default for Process.terminate to work
        signal(SIGTERM, SIG_DFL)
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1


def signal_handler(signum, _):
    global helloProcess, counterProcess

    if signum == SIGCHLD:
        pid, status = os.waitpid(-1, os.WNOHANG|os.WUNTRACED|os.WCONTINUED)
        if os.WIFCONTINUED(status) or os.WIFSTOPPED(status):
            return
        if os.WIFSIGNALED(status) or os.WIFEXITED(status):
            if helloProcess.pid == pid:
                print("Restarting helloProcess")
                helloProcess = HelloWorld()
                helloProcess.start()

            elif counterProcess.pid == pid:
                print("Restarting counterProcess")
                counterProcess = Counter()
                counterProcess.start()

    else:
        # mother shouldn't be notified when it terminates children
        signal(SIGCHLD, SIG_DFL)
        if helloProcess.is_alive():
            print("Stopping helloProcess")
            helloProcess.terminate()

        if counterProcess.is_alive():
            print("Stopping counterProcess")
            counterProcess.terminate()

        sys.exit(0)

if __name__ == '__main__':

    helloProcess = HelloWorld()
    helloProcess.start()

    counterProcess = Counter()
    counterProcess.start()

    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, signal_handler)

    while True:
        pause()

以下代码在不使用 signal.SIGCHLD 的情况下重新创建死 children。所以它比前者更简单
创建了两个 children 后,母进程为 SIGINT、SIGTERM、SIGQUIT 设置了一个名为 term_child 的信号处理程序。 term_child 在调用时终止并加入每个 child。

母进程不断检查 children 是否存活,并在必要时在 while 循环中重新创建它们。

因为每个 child 都从 mother 继承了信号处理程序,SIGINT 处理程序应该重置为默认值 Process.terminate 才能工作

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT
import multiprocessing

class HelloWorld(multiprocessing.Process):    
    def run(self):
        signal(SIGTERM, SIG_DFL)
        while True:
            print "Hello World"
            time.sleep(1)

class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1

    def run(self):
        signal(SIGTERM, SIG_DFL)
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1

def term_child(_, __):
    for child in children:
        child.terminate()
        child.join()
    sys.exit(0)

if __name__ == '__main__':

    children = [HelloWorld(), Counter()]
    for child in children:
        child.start()

    for signame in (SIGINT, SIGTERM, SIGQUIT):
        signal(signame, term_child)

    while True:
        for i, child in enumerate(children):
            if not child.is_alive():
                children[i] = type(child)()
                children[i].start()
        time.sleep(1)