在 Python 池中创建无阻塞的队列延迟
Creating a Queue delay in a Python pool without blocking
我有一个大型程序(具体来说,是一个函数),我试图使用 JoinableQueue
和多处理 map_async
方法对其进行并行化。我正在使用的函数对多维数组执行多项操作,因此我将每个数组分成多个部分,每个部分独立评估;但是我需要尽早将其中一个数组拼接在一起,但是 "stitch" 发生在 "evaluate" 之前,我需要在 JoinableQueue 中引入某种延迟。我到处寻找可行的解决方案,但我对多处理还很陌生,而且大部分都在我的脑海中。
这个措辞可能令人困惑 - 抱歉。这是我的代码大纲(我不能把所有的都写出来,因为它很长,但如果需要我可以提供额外的细节)
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, Pipe, JoinableQueue
def main_function(section_number):
#define section sizes
array_this_section = array[:,start:end+1,:]
histogram_this_section = np.zeros((3, dataset_size, dataset_size))
#start and end are defined according to the size of the array
#dataset_size is to show that the histogram is a different size than the array
for m in range(1,num_iterations+1):
#do several operations- each section of the array
#corresponds to a section on the histogram
hist_queue.put(histogram_this_section)
#each process sends their own part of the histogram outside of the pool
#to be combined with every other part- later operations
#in this function must use the full histogram
hist_queue.join()
full_histogram = full_hist_queue.get()
full_hist_queue.task_done()
#do many more operations
hist_queue = JoinableQueue()
full_hist_queue = JoinableQueue()
if __name__ == '__main__':
pool = mp.Pool(num_sections)
args = np.arange(num_sections)
pool.map_async(main_function, args, chunksize=1)
#I need the map_async because the program is designed to display an output at the
#end of each iteration, and each output must be a compilation of all processes
#a few variable definitions go here
for m in range(1,num_iterations+1):
for i in range(num_sections):
temp_hist = hist_queue.get() #the code hangs here because the queue
#is attempting to get before anything
#has been put
hist_full += temp_hist
for i in range(num_sections):
hist_queue.task_done()
for i in range(num_sections):
full_hist_queue.put(hist_full) #the full histogram is sent back into
#the pool
full_hist_queue.join()
#etc etc
pool.close()
pool.join()
我很确定您的问题是您如何创建 Queue
s 并尝试与子进程共享它们。如果您只是将它们作为全局变量,它们可能会在子进程中重新创建而不是继承(具体细节取决于您使用的 OS and/or context multiprocessing
).
解决此问题的更好方法是避免使用 multiprocessing.Pool
生成您的进程,而是自己为您的工作人员显式创建 Process
实例。通过这种方式,您可以毫无困难地将 Queue
实例传递给需要它们的进程(技术上 可以将队列传递给 Pool
工作人员,但是尴尬)。
我会尝试这样的事情:
def worker_function(section_number, hist_queue, full_hist_queue): # take queues as arguments
# ... the rest of the function can work as before
# note, I renamed this from "main_function" since it's not running in the main process
if __name__ == '__main__':
hist_queue = JoinableQueue() # create the queues only in the main process
full_hist_queue = JoinableQueue() # the workers don't need to access them as globals
processes = [Process(target=worker_function, args=(i, hist_queue, full_hist_queue)
for i in range(num_sections)]
for p in processes:
p.start()
# ...
如果你的 worker 函数的不同阶段或多或少相互独立(也就是说,"do many more operations" 步骤不直接依赖于它上面的 "do several operations" 步骤,只是full_histogram
),您也许可以保留 Pool
,而是将不同的步骤拆分为单独的函数,主进程可以通过多次调用池中的 map
来调用这些函数。在这种方法中,您不需要使用自己的 Queue
,只需使用池中内置的那些。这可能是最好的,特别是如果您将工作分成的 "sections" 的数量与您计算机上的处理器核心数量不完全对应。您可以让 Pool
匹配核心数,并让每个核心依次处理数据的多个部分。
粗略的描述如下:
def worker_make_hist(section_number):
# do several operations, get a partial histogram
return histogram_this_section
def worker_do_more_ops(section_number, full_histogram):
# whatever...
return some_result
if __name__ == "__main__":
pool = multiprocessing.Pool() # by default the size will be equal to the number of cores
for temp_hist in pool.imap_unordered(worker_make_hist, range(number_of_sections)):
hist_full += temp_hist
some_results = pool.starmap(worker_do_more_ops, zip(range(number_of_sections),
itertools.repeat(hist_full)))
我有一个大型程序(具体来说,是一个函数),我试图使用 JoinableQueue
和多处理 map_async
方法对其进行并行化。我正在使用的函数对多维数组执行多项操作,因此我将每个数组分成多个部分,每个部分独立评估;但是我需要尽早将其中一个数组拼接在一起,但是 "stitch" 发生在 "evaluate" 之前,我需要在 JoinableQueue 中引入某种延迟。我到处寻找可行的解决方案,但我对多处理还很陌生,而且大部分都在我的脑海中。
这个措辞可能令人困惑 - 抱歉。这是我的代码大纲(我不能把所有的都写出来,因为它很长,但如果需要我可以提供额外的细节)
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, Pipe, JoinableQueue
def main_function(section_number):
#define section sizes
array_this_section = array[:,start:end+1,:]
histogram_this_section = np.zeros((3, dataset_size, dataset_size))
#start and end are defined according to the size of the array
#dataset_size is to show that the histogram is a different size than the array
for m in range(1,num_iterations+1):
#do several operations- each section of the array
#corresponds to a section on the histogram
hist_queue.put(histogram_this_section)
#each process sends their own part of the histogram outside of the pool
#to be combined with every other part- later operations
#in this function must use the full histogram
hist_queue.join()
full_histogram = full_hist_queue.get()
full_hist_queue.task_done()
#do many more operations
hist_queue = JoinableQueue()
full_hist_queue = JoinableQueue()
if __name__ == '__main__':
pool = mp.Pool(num_sections)
args = np.arange(num_sections)
pool.map_async(main_function, args, chunksize=1)
#I need the map_async because the program is designed to display an output at the
#end of each iteration, and each output must be a compilation of all processes
#a few variable definitions go here
for m in range(1,num_iterations+1):
for i in range(num_sections):
temp_hist = hist_queue.get() #the code hangs here because the queue
#is attempting to get before anything
#has been put
hist_full += temp_hist
for i in range(num_sections):
hist_queue.task_done()
for i in range(num_sections):
full_hist_queue.put(hist_full) #the full histogram is sent back into
#the pool
full_hist_queue.join()
#etc etc
pool.close()
pool.join()
我很确定您的问题是您如何创建 Queue
s 并尝试与子进程共享它们。如果您只是将它们作为全局变量,它们可能会在子进程中重新创建而不是继承(具体细节取决于您使用的 OS and/or context multiprocessing
).
解决此问题的更好方法是避免使用 multiprocessing.Pool
生成您的进程,而是自己为您的工作人员显式创建 Process
实例。通过这种方式,您可以毫无困难地将 Queue
实例传递给需要它们的进程(技术上 可以将队列传递给 Pool
工作人员,但是尴尬)。
我会尝试这样的事情:
def worker_function(section_number, hist_queue, full_hist_queue): # take queues as arguments
# ... the rest of the function can work as before
# note, I renamed this from "main_function" since it's not running in the main process
if __name__ == '__main__':
hist_queue = JoinableQueue() # create the queues only in the main process
full_hist_queue = JoinableQueue() # the workers don't need to access them as globals
processes = [Process(target=worker_function, args=(i, hist_queue, full_hist_queue)
for i in range(num_sections)]
for p in processes:
p.start()
# ...
如果你的 worker 函数的不同阶段或多或少相互独立(也就是说,"do many more operations" 步骤不直接依赖于它上面的 "do several operations" 步骤,只是full_histogram
),您也许可以保留 Pool
,而是将不同的步骤拆分为单独的函数,主进程可以通过多次调用池中的 map
来调用这些函数。在这种方法中,您不需要使用自己的 Queue
,只需使用池中内置的那些。这可能是最好的,特别是如果您将工作分成的 "sections" 的数量与您计算机上的处理器核心数量不完全对应。您可以让 Pool
匹配核心数,并让每个核心依次处理数据的多个部分。
粗略的描述如下:
def worker_make_hist(section_number):
# do several operations, get a partial histogram
return histogram_this_section
def worker_do_more_ops(section_number, full_histogram):
# whatever...
return some_result
if __name__ == "__main__":
pool = multiprocessing.Pool() # by default the size will be equal to the number of cores
for temp_hist in pool.imap_unordered(worker_make_hist, range(number_of_sections)):
hist_full += temp_hist
some_results = pool.starmap(worker_do_more_ops, zip(range(number_of_sections),
itertools.repeat(hist_full)))