运行 并行运行并使用队列获取输出

Run function in parallel and grab outputs using Queue

我想用不同的参数取笑 function。对于每个不同的参数,我想 运行 并行函数,然后 得到每个 运行 的输出。 multiprocessing 模块似乎可以提供帮助。我不确定完成这项工作的正确步骤。

Do I start all the processes, then get all the queues and then join all the processes in this order? Or do I get the results after I have joined? Or do I get the ith result after I have joined the ith process?

from numpy.random import uniform
from multiprocessing import Process, Queue

def function(x):
    return uniform(0.0, x)

if __name__ == "__main__":
    queue = Queue()
    processes = []
    x_values = [1.0, 10.0, 100.0]
    
    # Start all processes
    for x in x_values:
        process = Process(target=function, args=(x, queue, ))
        processes.append(process)
        process.start()

    # Grab results of the processes?
    outputs = [queue.get() for _ in range(len(x_values))]
    
    # Not even sure what this does but apparently it's needed
    for process in processes:
        process.join()

proc.join() 阻塞直到进程结束。 queue.get() 阻塞,直到队列中有东西。因为您的进程没有将任何东西放入队列(在此示例中),所以此代码永远不会超出 queue.get() 部分...如果您的进程在最后将某些东西放入队列,那么它如果您先加入()或获取()并不重要,因为它们几乎同时发生。

所以让我们为多处理池做一个简单的例子,它有一个睡眠 3 秒的加载函数和 returns 传递给它的值(你的参数)以及函数的结果只是加倍它. IIRC 干净地停止池存在一些问题

from multiprocessing import Pool
import time

def time_waster(val):
    try:
        time.sleep(3)
    
        return (val, val*2) #return a tuple here but you can use a dict as well with all your parameters
    except KeyboardInterrupt:
        raise KeyboardInterruptError()
        
if __name__ == '__main__':
    x = list(range(5)) #values to pass to the function
    results = []
    try:
        with Pool(2) as p: #I use 2 but you can use as many as you have cores
            results.append(p.map(time_waster,x))
    except KeyboardInterrupt:
        p.terminate()
    except Exception as e:
        p.terminate()
    finally:
        p.join()
    print(results)

作为一项额外的服务,作为 IIRC 添加了一些键盘中断处理程序,存在一些中断 pools. 的问题