multiprocessing中map与Pool结合使用时如何划分数据?

How does map divide data when used in conjunction with Pool in multiprocessing?

我有一个函数 f,我想在某个大数据上并行计算。数据可以以多种方式划分,我正在尝试就如何划分它做出决定。我试图了解 "map" 在 multiprocessing.Pool distribute/divide 中的数据,以便我在拆分数据和选择处理器数量方面做出正确的决定。我的输入数据不仅仅是一个列表,如下例所示,而是字典列表和列表列表,因此了解 Pool.map 如何划分数据似乎很重要。

话虽这么说,我认为理解简单的例子会告诉我们更复杂的例子。

以下脚本显示我们正在选择一个包含 5 个进程的池和 [1,2,3] 中的数据。这里为划分数据所做的隐式选择是什么?

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

我天真的理解是,Pool 只是简单地按顺序处理输入列表,将前 'n' 个元素发送到池中,然后在第一个进程再次可用后,那个进程获取下一个元素,直到有没有更多的元素。最后它等待所有元素完成后再返回。

你应该用一个列表做一个实验:[2,2,2,5,2,2,2] 和一个函数:

def f(x):
    sleep(x)
    return x * x

它没有记录,所以你不应该依赖任何特定的行为。您可以通过传递可选的 chunksize= 参数来强制执行它。如果您不这样做,则会使用启发式方法为您构成 chunksize 的值。这可以在您的源代码树 Lib/multiprocessing/Pool.py:

的私有函数 _map_async() 中找到
def _map_async(self, func, iterable, mapper, chunksize=None, ...
    '''
    Helper function to implement map, starmap and their async counterparts.
    '''
    ...
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
    ...

len(self._pool)是worker进程数。因此,默认情况下,如果工作项少于进程数的 4 倍,则一次传递一个。您的具体示例 (3 <= 4*5) 就是这种情况。如果工作项比进程多得多,则选择块大小,以便每个进程在 map() 的生命周期内将工作块交给大约 4​​ 次。例如,如果您的列表中有 500 个项目,500 / (5*4) == 25,那么一次将有 25 个项目传递给工作进程。

为什么不一次调用 100 个,这样 5 个 worker 中的每个都只被调用一次?因为它是一种启发式 ;-) 传递少于该值是一种权衡,平衡需要完成进程间通信的次数与负载平衡(不同的工作项可能需要不同的时间才能完成)。但是关于负载平衡的任何事情都无法提前知道,因此启发式算法给予更多(但不是绝对!)权重以保持进程间调用的数量较低。

这就是它没有被记录的原因。很可能有一天会使用更智能的启发式算法。

可以看到multiprocessing.Pool.map处理进程间分工的方式here

简而言之,它将把给定的可迭代对象分成块,块的大小是可迭代对象的大小除以工人数量时间 4。

在你的具体例子中:

In [1]: chunksize, extra = divmod(len([1,2,3]), 5 * 4)
In [2]: if extra:
   ...:     chunksize += 1
   ...:     
In [3]: chunksize
Out[3]: 1

它将产生三个大小为 1 的块。

您可以通过 chunksize 参数自行控制块大小。