Cpu 在 python 中使用多处理时被锁定

Cpu locked when using multiprocessing with python

我的代码:

def create_rods(folder="./", kappas=10, allowed_kappa_error=.3,
                radius_correction_ratio=0.1):
    """
    Create one rod for each rod_data and for each file
    returns [RodGroup1, RodGroup2, ...]
    """
    names, files = import_files(folder=folder)
    if len(files) == 0:
        print "No files to import."
        raise ValueError
    states = [None for dummy_ in range(len(files))]
    processes = []
    states_queue = mp.Queue()
    for index in range(len(files)):
        process = mp.Process(target=create_rods_process,
                        args=(kappas, allowed_kappa_error,
                        radius_correction_ratio, names,
                        files, index, states_queue))
        processes.append(process)
    run_processes(processes)        #This part seem to take a lot of time.
    try:
        while True:
            [index, state] = states_queue.get(False)
            states[index] = state
    except Queue.Empty:
        pass    
    return names, states

def create_rods_process(kappas, allowed_kappa_error,
                    radius_correction_ratio, names,
                    files, index, states_queue):
    """
    Process of method.
    """
    state = SystemState(kappas, allowed_kappa_error,
                radius_correction_ratio, names[index])
    data = import_data(files[index])
    for dataline in data:
        parameters = tuple(dataline)
        new_rod = Rod(parameters)
        state.put_rod(new_rod)
    state.check_rods()
    states_queue.put([index, state])

def run_processes(processes, time_out=None):
    """
        Runs all processes using all cores.
    """
    running = []
    cpus = mp.cpu_count()
    try:
        while True:
        #for cpu in range(cpus):
            next_process = processes.pop()
            running.append(next_process)
            next_process.start()
    except IndexError:
        pass
    if not time_out:
        try:
            while True:
                for process in running:
                    if not process.is_alive():
                        running.remove(process)
        except TypeError:
            pass
    else:
        for process in running:
            process.join(time_out)

我希望进程结束,但我遇到了一个进程卡住了。不知道是run_processes()方法有问题,还是create_rods()方法有问题。随着加入 cpus 被释放,但程序没有继续。

来自 Python 的 multiprocessing guidelines

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

在排空队列之前加入进程会导致死锁。在加入进程之前,您需要确保队列已清空。