在 Python 池中创建无阻塞的队列延迟

Creating a Queue delay in a Python pool without blocking

我有一个大型程序(具体来说,是一个函数),我试图使用 JoinableQueue 和多处理 map_async 方法对其进行并行化。我正在使用的函数对多维数组执行多项操作,因此我将每个数组分成多个部分,每个部分独立评估;但是我需要尽早将其中一个数组拼接在一起,但是 "stitch" 发生在 "evaluate" 之前,我需要在 JoinableQueue 中引入某种延迟。我到处寻找可行的解决方案,但我对多处理还很陌生,而且大部分都在我的脑海中。

这个措辞可能令人困惑 - 抱歉。这是我的代码大纲(我不能把所有的都写出来,因为它很长,但如果需要我可以提供额外的细节)

import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, Pipe, JoinableQueue

def main_function(section_number):

    #define section sizes
    array_this_section = array[:,start:end+1,:]
    histogram_this_section = np.zeros((3, dataset_size, dataset_size))
    #start and end are defined according to the size of the array
    #dataset_size is to show that the histogram is a different size than the array

    for m in range(1,num_iterations+1):
        #do several operations- each section of the array 
                 #corresponds to a section on the histogram

        hist_queue.put(histogram_this_section)

        #each process sends their own part of the histogram outside of the pool 
                 #to be combined with every other part- later operations 
                 #in this function must use the full histogram

        hist_queue.join()
        full_histogram = full_hist_queue.get()
        full_hist_queue.task_done()

        #do many more operations


hist_queue = JoinableQueue()
full_hist_queue = JoinableQueue()

if __name__ == '__main__':
    pool = mp.Pool(num_sections)
    args = np.arange(num_sections)
    pool.map_async(main_function, args, chunksize=1)    

    #I need the map_async because the program is designed to display an output at the 
        #end of each iteration, and each output must be a compilation of all processes

    #a few variable definitions go here

    for m in range(1,num_iterations+1):
        for i in range(num_sections):
            temp_hist = hist_queue.get()    #the code hangs here because the queue 
                                            #is attempting to get before anything 
                                            #has been put
            hist_full += temp_hist
        for i in range(num_sections):
            hist_queue.task_done()
        for i in range(num_sections):
            full_hist_queue.put(hist_full)    #the full histogram is sent back into 
                                              #the pool


            full_hist_queue.join()

        #etc etc

    pool.close()
    pool.join()

我很确定您的问题是您如何创建 Queues 并尝试与子进程共享它们。如果您只是将它们作为全局变量,它们可能会在子进程中重新创建而不是继承(具体细节取决于您使用的 OS and/or context multiprocessing).

解决此问题的更好方法是避免使用 multiprocessing.Pool 生成您的进程,而是自己为您的工作人员显式创建 Process 实例。通过这种方式,您可以毫无困难地将 Queue 实例传递给需要它们的进程(技术上 可以将队列传递给 Pool 工作人员,但是尴尬)。

我会尝试这样的事情:

def worker_function(section_number, hist_queue, full_hist_queue): # take queues as arguments
    # ... the rest of the function can work as before
    # note, I renamed this from "main_function" since it's not running in the main process

if __name__ == '__main__':
    hist_queue = JoinableQueue()   # create the queues only in the main process
    full_hist_queue = JoinableQueue()  # the workers don't need to access them as globals

    processes = [Process(target=worker_function, args=(i, hist_queue, full_hist_queue)
                 for i in range(num_sections)]
    for p in processes:
        p.start()

    # ...

如果你的 worker 函数的不同阶段或多或少相互独立(也就是说,"do many more operations" 步骤不直接依赖于它上面的 "do several operations" 步骤,只是full_histogram),您也许可以保留 Pool,而是将不同的步骤拆分为单独的函数,主进程可以通过多次调用池中的 map 来调用这些函数。在这种方法中,您不需要使用自己的 Queue,只需使用池中内置的那些。这可能是最好的,特别是如果您将工作分成的 "sections" 的数量与您计算机上的处理器核心数量不完全对应。您可以让 Pool 匹配核心数,并让每个核心依次处理数据的多个部分。

粗略的描述如下:

def worker_make_hist(section_number):
    # do several operations, get a partial histogram
    return histogram_this_section

def worker_do_more_ops(section_number, full_histogram):
    # whatever...
    return some_result

if __name__ == "__main__":
    pool = multiprocessing.Pool() # by default the size will be equal to the number of cores

    for temp_hist in pool.imap_unordered(worker_make_hist, range(number_of_sections)):
        hist_full += temp_hist

    some_results = pool.starmap(worker_do_more_ops, zip(range(number_of_sections),
                                                        itertools.repeat(hist_full)))