multiprocessing中map与Pool结合使用时如何划分数据?
How does map divide data when used in conjunction with Pool in multiprocessing?
我有一个函数 f,我想在某个大数据上并行计算。数据可以以多种方式划分,我正在尝试就如何划分它做出决定。我试图了解 "map" 在 multiprocessing.Pool distribute/divide 中的数据,以便我在拆分数据和选择处理器数量方面做出正确的决定。我的输入数据不仅仅是一个列表,如下例所示,而是字典列表和列表列表,因此了解 Pool.map 如何划分数据似乎很重要。
话虽这么说,我认为理解简单的例子会告诉我们更复杂的例子。
以下脚本显示我们正在选择一个包含 5 个进程的池和 [1,2,3] 中的数据。这里为划分数据所做的隐式选择是什么?
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
我天真的理解是,Pool 只是简单地按顺序处理输入列表,将前 'n' 个元素发送到池中,然后在第一个进程再次可用后,那个进程获取下一个元素,直到有没有更多的元素。最后它等待所有元素完成后再返回。
你应该用一个列表做一个实验:[2,2,2,5,2,2,2] 和一个函数:
def f(x):
sleep(x)
return x * x
它没有记录,所以你不应该依赖任何特定的行为。您可以通过传递可选的 chunksize=
参数来强制执行它。如果您不这样做,则会使用启发式方法为您构成 chunksize 的值。这可以在您的源代码树 Lib/multiprocessing/Pool.py
:
的私有函数 _map_async()
中找到
def _map_async(self, func, iterable, mapper, chunksize=None, ...
'''
Helper function to implement map, starmap and their async counterparts.
'''
...
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
...
len(self._pool)
是worker进程数。因此,默认情况下,如果工作项少于进程数的 4 倍,则一次传递一个。您的具体示例 (3 <= 4*5
) 就是这种情况。如果工作项比进程多得多,则选择块大小,以便每个进程在 map()
的生命周期内将工作块交给大约 4 次。例如,如果您的列表中有 500 个项目,500 / (5*4) == 25
,那么一次将有 25 个项目传递给工作进程。
为什么不一次调用 100 个,这样 5 个 worker 中的每个都只被调用一次?因为它是一种启发式 ;-) 传递少于该值是一种权衡,平衡需要完成进程间通信的次数与负载平衡(不同的工作项可能需要不同的时间才能完成)。但是关于负载平衡的任何事情都无法提前知道,因此启发式算法给予更多(但不是绝对!)权重以保持进程间调用的数量较低。
这就是它没有被记录的原因。很可能有一天会使用更智能的启发式算法。
可以看到multiprocessing.Pool.map
处理进程间分工的方式here。
简而言之,它将把给定的可迭代对象分成块,块的大小是可迭代对象的大小除以工人数量时间 4。
在你的具体例子中:
In [1]: chunksize, extra = divmod(len([1,2,3]), 5 * 4)
In [2]: if extra:
...: chunksize += 1
...:
In [3]: chunksize
Out[3]: 1
它将产生三个大小为 1 的块。
您可以通过 chunksize
参数自行控制块大小。
我有一个函数 f,我想在某个大数据上并行计算。数据可以以多种方式划分,我正在尝试就如何划分它做出决定。我试图了解 "map" 在 multiprocessing.Pool distribute/divide 中的数据,以便我在拆分数据和选择处理器数量方面做出正确的决定。我的输入数据不仅仅是一个列表,如下例所示,而是字典列表和列表列表,因此了解 Pool.map 如何划分数据似乎很重要。
话虽这么说,我认为理解简单的例子会告诉我们更复杂的例子。
以下脚本显示我们正在选择一个包含 5 个进程的池和 [1,2,3] 中的数据。这里为划分数据所做的隐式选择是什么?
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
我天真的理解是,Pool 只是简单地按顺序处理输入列表,将前 'n' 个元素发送到池中,然后在第一个进程再次可用后,那个进程获取下一个元素,直到有没有更多的元素。最后它等待所有元素完成后再返回。
你应该用一个列表做一个实验:[2,2,2,5,2,2,2] 和一个函数:
def f(x):
sleep(x)
return x * x
它没有记录,所以你不应该依赖任何特定的行为。您可以通过传递可选的 chunksize=
参数来强制执行它。如果您不这样做,则会使用启发式方法为您构成 chunksize 的值。这可以在您的源代码树 Lib/multiprocessing/Pool.py
:
_map_async()
中找到
def _map_async(self, func, iterable, mapper, chunksize=None, ...
'''
Helper function to implement map, starmap and their async counterparts.
'''
...
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
...
len(self._pool)
是worker进程数。因此,默认情况下,如果工作项少于进程数的 4 倍,则一次传递一个。您的具体示例 (3 <= 4*5
) 就是这种情况。如果工作项比进程多得多,则选择块大小,以便每个进程在 map()
的生命周期内将工作块交给大约 4 次。例如,如果您的列表中有 500 个项目,500 / (5*4) == 25
,那么一次将有 25 个项目传递给工作进程。
为什么不一次调用 100 个,这样 5 个 worker 中的每个都只被调用一次?因为它是一种启发式 ;-) 传递少于该值是一种权衡,平衡需要完成进程间通信的次数与负载平衡(不同的工作项可能需要不同的时间才能完成)。但是关于负载平衡的任何事情都无法提前知道,因此启发式算法给予更多(但不是绝对!)权重以保持进程间调用的数量较低。
这就是它没有被记录的原因。很可能有一天会使用更智能的启发式算法。
可以看到multiprocessing.Pool.map
处理进程间分工的方式here。
简而言之,它将把给定的可迭代对象分成块,块的大小是可迭代对象的大小除以工人数量时间 4。
在你的具体例子中:
In [1]: chunksize, extra = divmod(len([1,2,3]), 5 * 4)
In [2]: if extra:
...: chunksize += 1
...:
In [3]: chunksize
Out[3]: 1
它将产生三个大小为 1 的块。
您可以通过 chunksize
参数自行控制块大小。