子数组的多进程填充,通过合并每个进程的所有子数组来构建全局数组

Multi-processing filling of sub-arrays to build a global array by merging all sub-arrays of each process

在 Python 2.7 中,我必须构建一个二维数组 (arrayFullCross_final[u][u]),其中包含 16 个块,每个块的大小为 100x100 个元素。一开始,我使用了一个 4D 数组 (arrayFullCross),然后我将它重新整形为 400x400 2D 数组。

我有第一个版本(顺序),我在其中使用经典的 python 函数“映射”和这样的“生成器”(buildCrossMatrix_loop 是我想要应用的函数生成器 generatorCrossMatrix) :

# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))


def buildCrossMatrix_loop(params_array):

  # rows indices
  xb = params_array[0]
  # columns indices
  yb = params_array[1]
  # Current redshift
  z = zrange[params_array[2]]

  # Loop inside block
  for ub in range(dimPoints):
    for vb in range(dimPoints):
      # Diagonal terms
      if (xb == yb):
        #arrayFullCross[u][u][w][t] = 2*P_bgs**2 * N_bgs**2
        if (xb == 0):
          N_bgs = (1+1/(n[params_array[2]]*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)))

          arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)**2 * N_bgs**2
...
...

##### MAIN LOOP to fill, at each index i, the array "arrayFullCross" #####
while i < len(zrange):

  ...
  ...

  def generatorCrossMatrix(index):
    for igen in range(dimBlocks):
      for lgen in range(dimBlocks):
        yield igen, lgen, index


  if __name__ == '__main__':
      map(buildCrossMatrix_loop, generatorCrossMatrix(i))
  
  ...      
  ...

i = i+1   

i 只是主循环“while”的索引。

使用这种顺序方法,一切正常,我得到了预期的大输出数组 arrayFullCross[u][v][x][y](我检查了其中的值,在按 400x400 整形后,它很好)。

现在,我尝试用 multiprocessing import Pool 做同样的事情。我做到了:

from multiprocessing import Pool

def buildCrossMatrix_loop(params_array):
...

while i < len(zrange):
...

if __name__ == '__main__':          
      pool = mp.Pool(16)
      pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
      pool.terminate()

      # Reshape 4D array to 2D global array
      arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)

      print 'arrayFullCross2D_final = ', arrayFullCross2D_final

但是当我打印最终输出的二维数组时 arrayFullCross2D_final,我系统地得到一个只填充零值的数组。

arrayFullCross2D_final =  [[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
 ...
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]]

也许我必须在不同进程之间共享 4D 数组 arrayFullCross?我该如何执行此操作?

每个进程如何同时修改 4D 数组的不同部分?

似乎这个 4D 全局数组被循环的每个 i 索引覆盖。

更新 1

我忘了说我已经像这样声明了完整的数组(在 main 的开头,即在 while 循环之外):

# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))

如何使用给出的答案和我的 arrayFullCross 声明的解决方案?即:

manager = Manager()
arrayFullCross = manager.list()

更新 2

我虽然通过使用 ThreadPoolfrom multiprocessing.dummy import Pool as ThreadPool 找到了一个很好的解决方案,方法是:

pool = ThreadPool(16)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.close()
pool.join()

但是性能好像不好: 确实,我只看到一个进程运行 top or htop 命令,正常吗?

似乎大部分时间都花在了锁定全局数组以写入其中:这不是必需的,因为我在独立的子数组上填充了全局数组。

我可以使用 ThreadPool 吗?

代码似乎确实是正确的。但是,当您 运行 它处于池模式时,每个工作人员都将拥有自己的数组副本。然后他们将写回您从未接触过的共享内存副本,因此 table 填充为 0.

通过在 multiprocessing 模块中使用共享内存变量,您应该能够与主线程共享结果。您可以使用 c 类型数组,但这会大大增加代码的复杂性。 multiprocessing 模块通过 Manager 子模块提供类似于 python 的列表。足以使 arrayFullCross 成为 Manager 列表:

from multiprocessing import Manager, Pool
manager = Manager()
arrayFullCross = manager.list()

def buildCrossMatrix_loop(params_array):
...

while i < len(zrange):
...

if __name__ == '__main__':          
      pool = mp.Pool(16)
      pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
      pool.terminate()

      # Reshape 4D array to 2D global array
      arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)

      print 'arrayFullCross2D_final = ', arrayFullCross2D_final

值得注意的是,使用 manager 对象会产生一定程度的开销。如果性能不令您满意,请尝试使用 multiprocessing.

中的 Array 类型

可以在 multiprocessing docs 中找到有关这些资源的更多信息。