如何处理子进程异常终止?

How to handle abnormal child process termination?

我正在使用 python 3.7 并遵循此 documentation。我想要一个进程,它应该生成一个子进程,等待它完成一个任务,并返回一些信息。我使用以下代码:

if __name__ == '__main__':
    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    print q.get()
    p.join()

当子进程正确完成时,没有问题,而且效果很好,但是当我的子进程在完成之前终止时,问题就开始了。 在这种情况下,我的应用程序正在等待。

q.get()p.join()超时并不能完全解决问题,因为我想立即知道子进程死了而不是等到超时。

另一个问题是 q.get() 超时会产生异常,我希望避免这种情况。

有人可以建议我更优雅的方法来克服这些问题吗?

队列和信号

一种可能是注册信号处理程序并使用它来传递标记值。 在 Unix 上,您可以在 parent 中处理 SIGCHLD,但在您的情况下这不是一个选项。根据 signal module:

On Windows, signal() can only be called with SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM, or SIGBREAK.

不确定通过 Task-Manager 杀死它是否会转化为 SIGTERM,但您可以尝试一下。

要处理 SIGTERM,您需要在 child.

中注册信号处理程序
import os
import sys
import time
import signal
from functools import partial
from multiprocessing import Process, Queue

SENTINEL = None


def _sigterm_handler(signum, frame, queue):
    print("received SIGTERM")
    queue.put(SENTINEL)
    sys.exit()


def register_sigterm(queue):
    global _sigterm_handler
    _sigterm_handler = partial(_sigterm_handler, queue=queue)
    signal.signal(signal.SIGTERM, _sigterm_handler)


def some_func(q):
    register_sigterm(q)
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    for msg in iter(q.get, SENTINEL):
        print(msg)
    p.join()

示例输出:

12273
msg_0
msg_1
msg_2
msg_3
received SIGTERM

Process finished with exit code 0

Queue & Process.is_alive()

即使这适用于 Task-Manager,你的 use-case 听起来你不能排除强制杀戮,所以我认为你最好采用不依赖信号的方法.

如果您的进程 p.is_alive(),您可以检查一个循环,调用 queue.get() 并指定 timeout 并处理 Empty 异常:

import os
import time
from queue import Empty
from multiprocessing import Process, Queue

def some_func(q):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()

    while p.is_alive():
        try:
            msg = q.get(timeout=0.1)
        except Empty:
            pass
        else:
            print(msg)

    p.join()

也可以避免异常,但我不推荐这样做,因为您不会将等待时间花在“排队”上,从而降低响应能力:

while p.is_alive():
    if not q.empty():
        msg = q.get_nowait()
        print(msg)
        time.sleep(0.1)

Pipe & Process.is_alive()

如果您打算使用一个连接 per-child,则可以使用管道而不是队列。它比队列更高效 (安装在管道顶部)并且您可以使用 multiprocessing.connection.wait (Python 3.3+) 一次等待多个 objects 就绪。

multiprocessing.connection.wait(object_list, timeout=None)

Wait till an object in object_list is ready. Returns the list of those objects in object_list which are ready. If timeout is a float then the call blocks for at most that many seconds. If timeout is None then it will block for an unlimited period. A negative timeout is equivalent to a zero timeout.

For both Unix and Windows, an object can appear in object_list if it is a readable Connection object; a connected and readable socket.socket object; or the sentinel attribute of a Process object. A connection or socket object is ready when there is data available to be read from it, or the other end has been closed.

Unix: wait(object_list, timeout) almost equivalent select.select(object_list, [], [], timeout). The difference is that, if select.select() is interrupted by a signal, it can raise OSError with an error number of EINTR, whereas wait() will not.

Windows: An item in object_list must either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 function WaitForMultipleObjects()) or it can be an object with a fileno() method which returns a socket handle or pipe handle. (Note that pipe handles and socket handles are not waitable handles.)

您可以使用它来同时等待进程的标记属性和管道的 parental 端。

import os
import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


def some_func(conn_write):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        conn_write.send(f'msg_{i}')


if __name__ == '__main__':

    conn_read, conn_write = Pipe(duplex=False)
    p = Process(target=some_func, args=(conn_write,))
    p.start()

    while p.is_alive():
        wait([p.sentinel, conn_read])  # block-wait until something gets ready
        if conn_read.poll():  # check if something can be received
            print(conn_read.recv())
    p.join()