即使在刷新队列后进程也没有加入

Processes not joining even after flushing their queues

我使用模块 multiprocessing 编写了一个程序,该程序全局执行如下:

  1. 同时启动了 simulationui 进程。
  2. simulation 进程为队列提供新的模拟状态。如果队列已满,则模拟循环不会被阻塞,因此它可以处理可能的传入消息。
  3. ui 进程消耗模拟队列。
  4. 在大约 1 秒的执行时间后,ui 进程向主进程发送一个 quit 事件,然后退出循环。退出后,它通过 _create_process() 的内部 wrapper() 函数向主进程发送一个 stopped 事件。
  5. 主进程以任意顺序接收这两个事件。 quit 事件导致主进程向所有子进程发送 stop 信号,而 stopped 事件在主循环中增加一个计数器,这将导致它在接收到作为有很多 stopped 个事件,因为有进程。
  6. simulation 进程接收到 stop 事件并退出循环,进而向主进程发送一个 stopped 事件。
  7. 主进程现在总共收到了 2 个 stopped 事件,并得出结论认为所有子进程都将被停止。结果退出了主循环
  8. run() 函数刷新已由子进程写入的队列。
  9. 正在加入子进程。

问题是,根据下面的日志,程序经常(但不总是)会在尝试加入 simulation 进程时挂起。

[...]
[INFO/ui] process exiting with exitcode 0
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
[DEBUG/simulation] Queue._start_thread()
[DEBUG/simulation] doing self._thread.start()
[DEBUG/simulation] starting thread to feed data to pipe
[DEBUG/simulation] ... done self._thread.start()
[DEBUG/simulation] telling queue thread to quit
[DEBUG/MainProcess] all child processes (2) should have been stopped!
[INFO/simulation] process shutting down
[DEBUG/simulation] running all "atexit" finalizers with priority >= 0
[DEBUG/simulation] telling queue thread to quit
[DEBUG/simulation] running the remaining "atexit" finalizers
[DEBUG/simulation] joining queue thread
[DEBUG/MainProcess] joining process <Process(simulation, started)>
[DEBUG/simulation] feeder thread got sentinel -- exiting
[DEBUG/simulation] ... queue thread joined
[DEBUG/simulation] joining queue thread

通过 shell 中的 Ctrl + C 停止执行会导致这些损坏的回溯:

Process simulation:
Traceback (most recent call last):
Traceback (most recent call last):
  File "./debug.py", line 224, in <module>
    run()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/process.py", line 257, in _bootstrap
    util._exit_function()
  File "./debug.py", line 92, in run
    process.join()  #< This doesn't work.
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 312, in _exit_function
    _run_finalizers()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/process.py", line 121, in join
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 252, in _run_finalizers
    finalizer()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 185, in __call__
    res = self._callback(*self._args, **self._kwargs)
    res = self._popen.wait(timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/popen_fork.py", line 54, in wait
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 196, in _finalize_join
    thread.join()
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/popen_fork.py", line 30, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 1060, in join
    self._wait_for_tstate_lock()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 1076, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

至于代码,这里是它的精简版本(因此它经常看起来不完整):

#!/usr/bin/env python3

import logging
import multiprocessing
import pickle
import queue
import time

from collections import namedtuple

_LOGGER = multiprocessing.log_to_stderr()
_LOGGER.setLevel(logging.DEBUG)

_BUFFER_SIZE = 4
_DATA_LENGTH = 2 ** 12

_STATUS_SUCCESS = 0
_STATUS_FAILURE = 1

_EVENT_ERROR = 0
_EVENT_QUIT = 1
_EVENT_STOPPED = 2

_MESSAGE_STOP = 0
_MESSAGE_EVENT = 1
_MESSAGE_SIMULATION_UPDATE = 2

_Message = namedtuple('_Message', ('type', 'value',))
_StopMessage = namedtuple('_StopMessage', ())
_EventMessage = namedtuple('_EventMessage', ('type', 'value',))
_SimulationUpdateMessage = namedtuple('_SimulationUpdateMessage', ('state',))

_MESSAGE_STRUCTS = {
    _MESSAGE_STOP: _StopMessage,
    _MESSAGE_EVENT: _EventMessage,
    _MESSAGE_SIMULATION_UPDATE: _SimulationUpdateMessage
}

def run():
    # Messages from the main process to the child ones.
    downward_queue = multiprocessing.Queue()
    # Messages from the child processes to the main one.
    upward_queue = multiprocessing.Queue()
    # Messages from the simulation process to the UI one.
    simulation_to_ui_queue = multiprocessing.Queue(maxsize=_BUFFER_SIZE)

    # Regroup all the queues that can be written by child processes.
    child_process_queues = (upward_queue, simulation_to_ui_queue,)

    processes = (
        _create_process(
            _simulation,
            upward_queue,
            name='simulation',
            args=(
                simulation_to_ui_queue,
                downward_queue
            )
        ),
        _create_process(
            _ui,
            upward_queue,
            name='ui',
            args=(
                upward_queue,
                simulation_to_ui_queue,
                downward_queue
            )
        )
    )

    try:
        for process in processes:
            process.start()

        _main(downward_queue, upward_queue, len(processes))
    finally:
        # while True:
        #     alive_processes = tuple(process for process in processes
        #                             if process.is_alive())
        #     if not alive_processes:
        #         break

        #     _LOGGER.debug("processes still alive: %s" % (alive_processes,))

        for q in child_process_queues:
            _flush_queue(q)

        for process in processes:
            _LOGGER.debug("joining process %s" % process)
            # process.terminate()  #< This works!
            process.join()  #< This doesn't work.

def _main(downward_queue, upward_queue, process_count):
    try:
        stopped_count = 0
        while True:
            message = _receive_message(upward_queue, False)
            if message is not None and message.type == _MESSAGE_EVENT:
                event_type = message.value.type
                if event_type in (_EVENT_QUIT, _EVENT_ERROR):
                    break
                elif event_type == _EVENT_STOPPED:
                    stopped_count += 1
                    if stopped_count >= process_count:
                        break
    finally:
        # Whatever happens, make sure that all child processes have stopped.
        if stopped_count >= process_count:
            return

        # Send a 'stop' signal to all the child processes.
        for _ in range(process_count):
            _send_message(downward_queue, True, _MESSAGE_STOP)

        while True:
            message = _receive_message(upward_queue, False)
            if (message is not None
                    and message.type == _MESSAGE_EVENT
                    and message.value.type == _EVENT_STOPPED):
                stopped_count += 1
                if stopped_count >= process_count:
                    _LOGGER.debug(
                        "all child processes (%d) should have been stopped!"
                        % stopped_count
                    )
                    break

def _simulation(simulation_to_ui_queue, downward_queue):
    simulation_state = [i * 0.123 for i in range(_DATA_LENGTH)]

    # When the queue is full (possibly form reaching _BUFFER_SIZE), the next
    # solve is computed and kept around until the queue is being consumed.
    next_solve_message = None
    while True:
        message = _receive_message(downward_queue, False)
        if message is not None and message.type == _MESSAGE_STOP:
            break

        if next_solve_message is None:
            # _step(simulation_state)

            # Somehow the copy (pickle) seems to increase the chances for
            # the issue to happen.
            next_solve_message = _SimulationUpdateMessage(
                state=pickle.dumps(simulation_state)
            )

        status = _send_message(simulation_to_ui_queue, False,
                               _MESSAGE_SIMULATION_UPDATE,
                               **next_solve_message._asdict())
        if status == _STATUS_SUCCESS:
            next_solve_message = None

def _ui(upward_queue, simulation_to_ui_queue, downward_queue):
    time_start = -1.0
    previous_time = 0.0
    while True:
        message = _receive_message(downward_queue, False)
        if message is not None and message.type == _MESSAGE_STOP:
            break

        if time_start < 0:
            current_time = 0.0
            time_start = time.perf_counter()
        else:
            current_time = time.perf_counter() - time_start

        message = _receive_message(simulation_to_ui_queue, False)

        if current_time > 1.0:
            _LOGGER.debug("asking to quit")
            _send_message(upward_queue, True, _MESSAGE_EVENT,
                          type=_EVENT_QUIT, value=None)
            break

        previous_time = current_time

def _create_process(target, upward_queue, name='', args=None):
    def wrapper(function, upward_queue, *args, **kwargs):
        try:
            function(*args, **kwargs)
        except Exception:
            _send_message(upward_queue, True, _MESSAGE_EVENT,
                          type=_EVENT_ERROR, value=None)
        finally:
            _send_message(upward_queue, True, _MESSAGE_EVENT,
                          type=_EVENT_STOPPED, value=None)
            upward_queue.close()

    process = multiprocessing.Process(
        target=wrapper,
        name=name,
        args=(target, upward_queue) + args,
        kwargs={}
    )
    return process

def _receive_message(q, block):
    try:
        message = q.get(block=block)
    except queue.Empty:
        return None

    return message

def _send_message(q, block, message_type, **kwargs):
    message_value = _MESSAGE_STRUCTS[message_type](**kwargs)
    try:
        q.put(_Message(type=message_type, value=message_value), block=block)
    except queue.Full:
        return _STATUS_FAILURE

    return _STATUS_SUCCESS

def _flush_queue(q):
    try:
        while True:
            q.get(block=False)
    except queue.Empty:
        pass

if __name__ == '__main__':
    run()

关于 Whosebug 的相关问题和 Python 文档中的提示基本上可以归结为需要在加入流程之前刷新队列,我相信我一直在尝试这样做。我意识到,当程序在退出时尝试刷新它们时,模拟队列可能仍在尝试将(可能很大的)缓冲数据推送到管道上,因此最终仍然是非空队列。这就是为什么我试图确保在达到这一点之前停止所有子进程。现在,查看上面的日志和取消注释 while True 循环检查活动进程后输出的附加日志,似乎 simulation 进程根本不想完全关闭,即使它的目标函数肯定退出了。这可能是我遇到问题的原因吗?

如果是这样,我应该如何干净地处理它?否则,我在这里错过了什么?

在 Mac OS X 10.9.5.

上使用 Python 3.4 进行了测试

PS:我想知道这是否与 this bug 有关?

听起来这个问题确实是由于在将数据推送到队列时出现了一些延迟,导致刷新无效,因为触发得太早了。

一个简单的 while process.is_alive(): flush_the_queues() 似乎可以解决问题![​​=11=]

最近我 运行 进入了一个与您类似的用例:多个进程(最多 11 个)、一个输入队列、一个输出队列。但是输出队列很重。 根据您的建议,我在 process.join().

之前执行 while process.is_alive(): flush_the_queues() 的开销最多为 5 秒 (!)

我已将输出队列的 multiprocessing.Manager.list 而不是 multiprocessing.Queue 的开销减少到 0.7 秒。 multiprocessing.Manager.list 不需要任何冲洗。如果可以的话,我可能还会考虑寻找输入队列的替代方案..

完整示例在这里:

import multiprocessing
import queue
import time


PROCESSES = multiprocessing.cpu_count() - 1
processes = []


def run():
    start = time.time()

    input_queue = multiprocessing.Queue()
    feed_input_queue(input_queue)

    with multiprocessing.Manager() as manager:
        output_list = manager.list()

        for _ in range(PROCESSES):
            p = multiprocessing.Process(target=_execute, args=(input_queue, output_list))
            processes.append(p)
            p.start()

        print(f"Time to process = {time.time() - start:.10f}")

        start = time.time()
        
        for p in processes:
            while p.is_alive():  # in principle we could get rid of this if we find an alternative to the output queue
                _flush_queue(input_queue)
            p.join()

        print(f"Time to join = {time.time() - start:.10f}")
        # from here you can do something with the output_list


def _feed_input_queue(input_queue):
    for i in range(10000):
        input_queue.put(i)


def _execute(input_queue: multiprocessing.Queue, output_list: list):
    while not input_queue.empty():
        input_item = input_queue.get()
        output_list.append(do_and_return_something_heavy(input_item))
        return True


def _flush_queue(q):
    try:
        while True:
            q.get(block=False)
    except queue.Empty:
        pass


def do_and_return_something_heavy(input_item):
    return str(input_item) * 100000


if __name__ == '__main__':
    run()

输出

Time to process = 0.1855618954
Time to join = 0.6889970303

在 Python 3.6.

上测试