如何在 Python 3 中重用进程池进行并行编程
How to reuse a process pool for parallel programming in Python 3
我是并行编程的新手。我的任务是分析数百个数据文件。这些数据中的每一个都将近 300MB,并且可以分成许多片。我的电脑是4核电脑。而且我想尽快得到每个数据的结果
每个数据文件的分析包括 2 个过程。首先将数据读入内存,然后分片成片,这是io密集型的工作。然后,对该文件的切片进行大量计算,这是 cpu 密集的。
所以我的策略是将这些文件分成 4 个一组。对于这些文件的每一组,首先,将 4 个文件的所有数据读入内存,在 4 个内核中使用 4 个进程。代码就像,
with Pool(processes=4) as pool:
data_list = pool.map(read_and_slice, files) # len(files)==4
然后对于data_list
中的每个data
,用4个进程进行计算。
for data in data_list: # I want to get the result of each data asap
with Pool(processes=4) as pool:
result_list = pool.map(compute, data.slices) # anaylyze each slice of data
analyze(result_list) # analyze the results of previous procedure, for example, get the average.
然后去另一组。
所以问题是在数百个文件的整个计算过程中,池被重新创建了很多次。我怎样才能避免重新创建池和进程的开销?我的代码中是否存在大量内存开销?有没有更好的方法让我尽可能减少所需的时间?
谢谢!
一个选项是将 with Pool
语句移到 for
循环之外……
p = Pool()
for data in data_list:
result_list = pool.map(compute, data.slices)
analyze(result_list)
p.join()
p.close()
这适用于 python 2 或 3。
如果您安装(我的模块)pathos
,然后执行 from pathos.pools import ProcessPool as Pool
,并保持其余代码与您拥有的完全相同——您将只创建一个 Pool
.这是因为 pathos
缓存了 Pool
,当创建具有相同配置的新 Pool
实例时,它只是重用现有实例。你可以做一个 pool.terminate()
来关闭它。
>>> from pathos.pools import ProcessPool as Pool
>>> pool = Pool()
>>> data_list = [range(4), range(4,8), range(8,12), range(12,16)]
>>> squared = lambda x:x**2
>>> mean = lambda x: sum(x)/len(x)
>>> for data in data_list:
... result = pool.map(squared, data)
... print mean(result)
...
3
31
91
183
实际上,pathos
使您能够执行嵌套池,因此您还可以将 for
循环转换为异步映射(amap
来自 pathos
)……并且由于内部映射不需要保留顺序,因此您可以使用无序映射迭代器(multiprocessing
中的imap_unordered
,或pathos
中的uimap
)。例如,请参见此处:
在这里:
唯一的遗憾是 pathos
是 python2
。但很快(待发布)将完全转换为 python3
.
我是并行编程的新手。我的任务是分析数百个数据文件。这些数据中的每一个都将近 300MB,并且可以分成许多片。我的电脑是4核电脑。而且我想尽快得到每个数据的结果
每个数据文件的分析包括 2 个过程。首先将数据读入内存,然后分片成片,这是io密集型的工作。然后,对该文件的切片进行大量计算,这是 cpu 密集的。
所以我的策略是将这些文件分成 4 个一组。对于这些文件的每一组,首先,将 4 个文件的所有数据读入内存,在 4 个内核中使用 4 个进程。代码就像,
with Pool(processes=4) as pool:
data_list = pool.map(read_and_slice, files) # len(files)==4
然后对于data_list
中的每个data
,用4个进程进行计算。
for data in data_list: # I want to get the result of each data asap
with Pool(processes=4) as pool:
result_list = pool.map(compute, data.slices) # anaylyze each slice of data
analyze(result_list) # analyze the results of previous procedure, for example, get the average.
然后去另一组。
所以问题是在数百个文件的整个计算过程中,池被重新创建了很多次。我怎样才能避免重新创建池和进程的开销?我的代码中是否存在大量内存开销?有没有更好的方法让我尽可能减少所需的时间?
谢谢!
一个选项是将 with Pool
语句移到 for
循环之外……
p = Pool()
for data in data_list:
result_list = pool.map(compute, data.slices)
analyze(result_list)
p.join()
p.close()
这适用于 python 2 或 3。
如果您安装(我的模块)pathos
,然后执行 from pathos.pools import ProcessPool as Pool
,并保持其余代码与您拥有的完全相同——您将只创建一个 Pool
.这是因为 pathos
缓存了 Pool
,当创建具有相同配置的新 Pool
实例时,它只是重用现有实例。你可以做一个 pool.terminate()
来关闭它。
>>> from pathos.pools import ProcessPool as Pool
>>> pool = Pool()
>>> data_list = [range(4), range(4,8), range(8,12), range(12,16)]
>>> squared = lambda x:x**2
>>> mean = lambda x: sum(x)/len(x)
>>> for data in data_list:
... result = pool.map(squared, data)
... print mean(result)
...
3
31
91
183
实际上,pathos
使您能够执行嵌套池,因此您还可以将 for
循环转换为异步映射(amap
来自 pathos
)……并且由于内部映射不需要保留顺序,因此您可以使用无序映射迭代器(multiprocessing
中的imap_unordered
,或pathos
中的uimap
)。例如,请参见此处:
在这里:
唯一的遗憾是 pathos
是 python2
。但很快(待发布)将完全转换为 python3
.