运行 并行运行并使用队列获取输出
Run function in parallel and grab outputs using Queue
我想用不同的参数取笑 function
。对于每个不同的参数,我想 运行 并行函数,然后 得到每个 运行 的输出。 multiprocessing
模块似乎可以提供帮助。我不确定完成这项工作的正确步骤。
Do I start all the processes, then get
all the queues and then join all the processes in this order? Or do I get
the results after I have joined? Or do I get the ith result after I have joined the ith process?
from numpy.random import uniform
from multiprocessing import Process, Queue
def function(x):
return uniform(0.0, x)
if __name__ == "__main__":
queue = Queue()
processes = []
x_values = [1.0, 10.0, 100.0]
# Start all processes
for x in x_values:
process = Process(target=function, args=(x, queue, ))
processes.append(process)
process.start()
# Grab results of the processes?
outputs = [queue.get() for _ in range(len(x_values))]
# Not even sure what this does but apparently it's needed
for process in processes:
process.join()
proc.join() 阻塞直到进程结束。 queue.get() 阻塞,直到队列中有东西。因为您的进程没有将任何东西放入队列(在此示例中),所以此代码永远不会超出 queue.get() 部分...如果您的进程在最后将某些东西放入队列,那么它如果您先加入()或获取()并不重要,因为它们几乎同时发生。
所以让我们为多处理池做一个简单的例子,它有一个睡眠 3 秒的加载函数和 returns 传递给它的值(你的参数)以及函数的结果只是加倍它.
IIRC 干净地停止池存在一些问题
from multiprocessing import Pool
import time
def time_waster(val):
try:
time.sleep(3)
return (val, val*2) #return a tuple here but you can use a dict as well with all your parameters
except KeyboardInterrupt:
raise KeyboardInterruptError()
if __name__ == '__main__':
x = list(range(5)) #values to pass to the function
results = []
try:
with Pool(2) as p: #I use 2 but you can use as many as you have cores
results.append(p.map(time_waster,x))
except KeyboardInterrupt:
p.terminate()
except Exception as e:
p.terminate()
finally:
p.join()
print(results)
作为一项额外的服务,作为 IIRC 添加了一些键盘中断处理程序,存在一些中断 pools. 的问题
我想用不同的参数取笑 function
。对于每个不同的参数,我想 运行 并行函数,然后 得到每个 运行 的输出。 multiprocessing
模块似乎可以提供帮助。我不确定完成这项工作的正确步骤。
Do I start all the processes, then
get
all the queues and then join all the processes in this order? Or do Iget
the results after I have joined? Or do I get the ith result after I have joined the ith process?
from numpy.random import uniform
from multiprocessing import Process, Queue
def function(x):
return uniform(0.0, x)
if __name__ == "__main__":
queue = Queue()
processes = []
x_values = [1.0, 10.0, 100.0]
# Start all processes
for x in x_values:
process = Process(target=function, args=(x, queue, ))
processes.append(process)
process.start()
# Grab results of the processes?
outputs = [queue.get() for _ in range(len(x_values))]
# Not even sure what this does but apparently it's needed
for process in processes:
process.join()
proc.join() 阻塞直到进程结束。 queue.get() 阻塞,直到队列中有东西。因为您的进程没有将任何东西放入队列(在此示例中),所以此代码永远不会超出 queue.get() 部分...如果您的进程在最后将某些东西放入队列,那么它如果您先加入()或获取()并不重要,因为它们几乎同时发生。
所以让我们为多处理池做一个简单的例子,它有一个睡眠 3 秒的加载函数和 returns 传递给它的值(你的参数)以及函数的结果只是加倍它. IIRC 干净地停止池存在一些问题
from multiprocessing import Pool
import time
def time_waster(val):
try:
time.sleep(3)
return (val, val*2) #return a tuple here but you can use a dict as well with all your parameters
except KeyboardInterrupt:
raise KeyboardInterruptError()
if __name__ == '__main__':
x = list(range(5)) #values to pass to the function
results = []
try:
with Pool(2) as p: #I use 2 but you can use as many as you have cores
results.append(p.map(time_waster,x))
except KeyboardInterrupt:
p.terminate()
except Exception as e:
p.terminate()
finally:
p.join()
print(results)
作为一项额外的服务,作为 IIRC 添加了一些键盘中断处理程序,存在一些中断 pools. 的问题