如何组织多处理池以收集字典中的输出
How to organize multiprocessing pool for collecting output in dictionary
我试图使用下面的可重现的小例子来重现我的“现实世界”问题的本质。这个例子试图利用我发现的功能 here。现实世界的例子需要 16 天,在我的笔记本电脑上使用一个内核,它有 16 个内核,所以考虑到大多数内核,我希望将我的运行时间缩短到一两天。但是,首先,我需要了解我在下面的小示例中做错了什么。
该示例首先设置一个名为 all_combos
的元组列表。这个想法是然后将 all_combos
中的每个元组传递给函数 do_one_run()
。我的目标是使用多重处理并行化 do_one_run()
。不幸的是,下面的小可重现示例会返回我无法解决的错误消息。我怀疑我误解了 Pool
的工作原理,特别是将每个参数元组映射到 do_one_run()
的参数,或者我误解了如何收集 do_one_run()
的输出],或者更可能是两者?
非常欢迎任何见解!
import random
import numpy as np
import multiprocessing as mp
slns = {}
var1 = [5, 6, 7]
var2 = [2, 3, 4]
var3 = [10, 9, 8]
all_combos = []
key = 0
for v1 in var1:
for v2 in var2:
for v3 in var3:
all_combos.append([key, v1, v2, v3])
key += 1
def example_func(v1_passed, v2_passed, v3_passed):
tmp = np.random.random((v1_passed, v2_passed, v3_passed))*100
my_arr = tmp.astype(int)
piece_arr = my_arr[1,:,1:3]
return piece_arr
def do_one_run(key, v1_passed, v2_passed, v3_passed):
results = example_func(v1_passed, v2_passed, v3_passed)
slns.update({key: [v1_passed, v2_passed, v3_passed, results]})
pool = mp.Pool(4) # 4 cores devoted to job?
result = pool.starmap(do_one_run, all_combos)
您不能通过多处理共享 slns
之类的变量。您必须从 do_one_run
函数收集所有 return 值:
import random
import numpy as np
import multiprocessing as mp
# slns = {} <- Remove this line
...
# Return result
def do_one_run(key, v1_passed, v2_passed, v3_passed):
results = example_func(v1_passed, v2_passed, v3_passed)
return key, [v1_passed, v2_passed, v3_passed, results]
if __name__ == '__main__':
with mp.Pool(4) as pool:
results = pool.starmap(do_one_run, all_combos) # <- Collect results
result = dict(itertools.chain(*map(dict.items, result))) # <- Merge results
>>> result
{0: [5,
2,
10,
array([[77, 90],
[34, 28]])],
1: [5,
2,
9,
array([[64, 43],
[45, 53]])],
2: [5,
2,
8,
array([[ 8, 78],
[39, 3]])],
...
}
将最后两行更改为:-
if __name__ == '__main__':
mp.Pool().starmap(do_one_run, all_combos)
print('Done') # So you know when it's finished
您可能还会发现此讨论有帮助:- python multiprocessing on windows, if __name__ == "__main__"
另请注意,在此示例中构造的 Pool() 没有参数。这样,底层实现将充分利用 CPU 架构,运行
我试图使用下面的可重现的小例子来重现我的“现实世界”问题的本质。这个例子试图利用我发现的功能 here。现实世界的例子需要 16 天,在我的笔记本电脑上使用一个内核,它有 16 个内核,所以考虑到大多数内核,我希望将我的运行时间缩短到一两天。但是,首先,我需要了解我在下面的小示例中做错了什么。
该示例首先设置一个名为 all_combos
的元组列表。这个想法是然后将 all_combos
中的每个元组传递给函数 do_one_run()
。我的目标是使用多重处理并行化 do_one_run()
。不幸的是,下面的小可重现示例会返回我无法解决的错误消息。我怀疑我误解了 Pool
的工作原理,特别是将每个参数元组映射到 do_one_run()
的参数,或者我误解了如何收集 do_one_run()
的输出],或者更可能是两者?
非常欢迎任何见解!
import random
import numpy as np
import multiprocessing as mp
slns = {}
var1 = [5, 6, 7]
var2 = [2, 3, 4]
var3 = [10, 9, 8]
all_combos = []
key = 0
for v1 in var1:
for v2 in var2:
for v3 in var3:
all_combos.append([key, v1, v2, v3])
key += 1
def example_func(v1_passed, v2_passed, v3_passed):
tmp = np.random.random((v1_passed, v2_passed, v3_passed))*100
my_arr = tmp.astype(int)
piece_arr = my_arr[1,:,1:3]
return piece_arr
def do_one_run(key, v1_passed, v2_passed, v3_passed):
results = example_func(v1_passed, v2_passed, v3_passed)
slns.update({key: [v1_passed, v2_passed, v3_passed, results]})
pool = mp.Pool(4) # 4 cores devoted to job?
result = pool.starmap(do_one_run, all_combos)
您不能通过多处理共享 slns
之类的变量。您必须从 do_one_run
函数收集所有 return 值:
import random
import numpy as np
import multiprocessing as mp
# slns = {} <- Remove this line
...
# Return result
def do_one_run(key, v1_passed, v2_passed, v3_passed):
results = example_func(v1_passed, v2_passed, v3_passed)
return key, [v1_passed, v2_passed, v3_passed, results]
if __name__ == '__main__':
with mp.Pool(4) as pool:
results = pool.starmap(do_one_run, all_combos) # <- Collect results
result = dict(itertools.chain(*map(dict.items, result))) # <- Merge results
>>> result
{0: [5,
2,
10,
array([[77, 90],
[34, 28]])],
1: [5,
2,
9,
array([[64, 43],
[45, 53]])],
2: [5,
2,
8,
array([[ 8, 78],
[39, 3]])],
...
}
将最后两行更改为:-
if __name__ == '__main__':
mp.Pool().starmap(do_one_run, all_combos)
print('Done') # So you know when it's finished
您可能还会发现此讨论有帮助:- python multiprocessing on windows, if __name__ == "__main__"
另请注意,在此示例中构造的 Pool() 没有参数。这样,底层实现将充分利用 CPU 架构,运行