子数组的多进程填充,通过合并每个进程的所有子数组来构建全局数组
Multi-processing filling of sub-arrays to build a global array by merging all sub-arrays of each process
在 Python 2.7 中,我必须构建一个二维数组 (arrayFullCross_final[u][u]
),其中包含 16 个块,每个块的大小为 100x100 个元素。一开始,我使用了一个 4D 数组 (arrayFullCross
),然后我将它重新整形为 400x400 2D 数组。
我有第一个版本(顺序),我在其中使用经典的 python 函数“映射”和这样的“生成器”(buildCrossMatrix_loop
是我想要应用的函数生成器 generatorCrossMatrix
) :
# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))
def buildCrossMatrix_loop(params_array):
# rows indices
xb = params_array[0]
# columns indices
yb = params_array[1]
# Current redshift
z = zrange[params_array[2]]
# Loop inside block
for ub in range(dimPoints):
for vb in range(dimPoints):
# Diagonal terms
if (xb == yb):
#arrayFullCross[u][u][w][t] = 2*P_bgs**2 * N_bgs**2
if (xb == 0):
N_bgs = (1+1/(n[params_array[2]]*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)))
arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)**2 * N_bgs**2
...
...
##### MAIN LOOP to fill, at each index i, the array "arrayFullCross" #####
while i < len(zrange):
...
...
def generatorCrossMatrix(index):
for igen in range(dimBlocks):
for lgen in range(dimBlocks):
yield igen, lgen, index
if __name__ == '__main__':
map(buildCrossMatrix_loop, generatorCrossMatrix(i))
...
...
i = i+1
i
只是主循环“while”的索引。
使用这种顺序方法,一切正常,我得到了预期的大输出数组 arrayFullCross[u][v][x][y]
(我检查了其中的值,在按 400x400 整形后,它很好)。
现在,我尝试用 multiprocessing import Pool
做同样的事情。我做到了:
from multiprocessing import Pool
def buildCrossMatrix_loop(params_array):
...
while i < len(zrange):
...
if __name__ == '__main__':
pool = mp.Pool(16)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.terminate()
# Reshape 4D array to 2D global array
arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)
print 'arrayFullCross2D_final = ', arrayFullCross2D_final
但是当我打印最终输出的二维数组时 arrayFullCross2D_final
,我系统地得到一个只填充零值的数组。
arrayFullCross2D_final = [[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
...
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]]
也许我必须在不同进程之间共享 4D 数组 arrayFullCross
?我该如何执行此操作?
每个进程如何同时修改 4D 数组的不同部分?
似乎这个 4D 全局数组被循环的每个 i
索引覆盖。
更新 1
我忘了说我已经像这样声明了完整的数组(在 main 的开头,即在 while
循环之外):
# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))
如何使用给出的答案和我的 arrayFullCross 声明的解决方案?即:
manager = Manager()
arrayFullCross = manager.list()
更新 2
我虽然通过使用 ThreadPool
和 from multiprocessing.dummy import Pool as ThreadPool
找到了一个很好的解决方案,方法是:
pool = ThreadPool(16)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.close()
pool.join()
但是性能好像不好: 确实,我只看到一个进程运行 top
or htop
命令,正常吗?
似乎大部分时间都花在了锁定全局数组以写入其中:这不是必需的,因为我在独立的子数组上填充了全局数组。
我可以使用 ThreadPool
吗?
代码似乎确实是正确的。但是,当您 运行 它处于池模式时,每个工作人员都将拥有自己的数组副本。然后他们将写回您从未接触过的共享内存副本,因此 table 填充为 0.
通过在 multiprocessing
模块中使用共享内存变量,您应该能够与主线程共享结果。您可以使用 c 类型数组,但这会大大增加代码的复杂性。 multiprocessing
模块通过 Manager
子模块提供类似于 python 的列表。足以使 arrayFullCross
成为 Manager
列表:
from multiprocessing import Manager, Pool
manager = Manager()
arrayFullCross = manager.list()
def buildCrossMatrix_loop(params_array):
...
while i < len(zrange):
...
if __name__ == '__main__':
pool = mp.Pool(16)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.terminate()
# Reshape 4D array to 2D global array
arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)
print 'arrayFullCross2D_final = ', arrayFullCross2D_final
值得注意的是,使用 manager
对象会产生一定程度的开销。如果性能不令您满意,请尝试使用 multiprocessing
.
中的 Array 类型
可以在 multiprocessing docs 中找到有关这些资源的更多信息。
在 Python 2.7 中,我必须构建一个二维数组 (arrayFullCross_final[u][u]
),其中包含 16 个块,每个块的大小为 100x100 个元素。一开始,我使用了一个 4D 数组 (arrayFullCross
),然后我将它重新整形为 400x400 2D 数组。
我有第一个版本(顺序),我在其中使用经典的 python 函数“映射”和这样的“生成器”(buildCrossMatrix_loop
是我想要应用的函数生成器 generatorCrossMatrix
) :
# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))
def buildCrossMatrix_loop(params_array):
# rows indices
xb = params_array[0]
# columns indices
yb = params_array[1]
# Current redshift
z = zrange[params_array[2]]
# Loop inside block
for ub in range(dimPoints):
for vb in range(dimPoints):
# Diagonal terms
if (xb == yb):
#arrayFullCross[u][u][w][t] = 2*P_bgs**2 * N_bgs**2
if (xb == 0):
N_bgs = (1+1/(n[params_array[2]]*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)))
arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)**2 * N_bgs**2
...
...
##### MAIN LOOP to fill, at each index i, the array "arrayFullCross" #####
while i < len(zrange):
...
...
def generatorCrossMatrix(index):
for igen in range(dimBlocks):
for lgen in range(dimBlocks):
yield igen, lgen, index
if __name__ == '__main__':
map(buildCrossMatrix_loop, generatorCrossMatrix(i))
...
...
i = i+1
i
只是主循环“while”的索引。
使用这种顺序方法,一切正常,我得到了预期的大输出数组 arrayFullCross[u][v][x][y]
(我检查了其中的值,在按 400x400 整形后,它很好)。
现在,我尝试用 multiprocessing import Pool
做同样的事情。我做到了:
from multiprocessing import Pool
def buildCrossMatrix_loop(params_array):
...
while i < len(zrange):
...
if __name__ == '__main__':
pool = mp.Pool(16)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.terminate()
# Reshape 4D array to 2D global array
arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)
print 'arrayFullCross2D_final = ', arrayFullCross2D_final
但是当我打印最终输出的二维数组时 arrayFullCross2D_final
,我系统地得到一个只填充零值的数组。
arrayFullCross2D_final = [[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
...
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]]
也许我必须在不同进程之间共享 4D 数组 arrayFullCross
?我该如何执行此操作?
每个进程如何同时修改 4D 数组的不同部分?
似乎这个 4D 全局数组被循环的每个 i
索引覆盖。
更新 1
我忘了说我已经像这样声明了完整的数组(在 main 的开头,即在 while
循环之外):
# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))
如何使用给出的答案和我的 arrayFullCross 声明的解决方案?即:
manager = Manager()
arrayFullCross = manager.list()
更新 2
我虽然通过使用 ThreadPool
和 from multiprocessing.dummy import Pool as ThreadPool
找到了一个很好的解决方案,方法是:
pool = ThreadPool(16)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.close()
pool.join()
但是性能好像不好: 确实,我只看到一个进程运行 top
or htop
命令,正常吗?
似乎大部分时间都花在了锁定全局数组以写入其中:这不是必需的,因为我在独立的子数组上填充了全局数组。
我可以使用 ThreadPool
吗?
代码似乎确实是正确的。但是,当您 运行 它处于池模式时,每个工作人员都将拥有自己的数组副本。然后他们将写回您从未接触过的共享内存副本,因此 table 填充为 0.
通过在 multiprocessing
模块中使用共享内存变量,您应该能够与主线程共享结果。您可以使用 c 类型数组,但这会大大增加代码的复杂性。 multiprocessing
模块通过 Manager
子模块提供类似于 python 的列表。足以使 arrayFullCross
成为 Manager
列表:
from multiprocessing import Manager, Pool
manager = Manager()
arrayFullCross = manager.list()
def buildCrossMatrix_loop(params_array):
...
while i < len(zrange):
...
if __name__ == '__main__':
pool = mp.Pool(16)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.terminate()
# Reshape 4D array to 2D global array
arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)
print 'arrayFullCross2D_final = ', arrayFullCross2D_final
值得注意的是,使用 manager
对象会产生一定程度的开销。如果性能不令您满意,请尝试使用 multiprocessing
.
可以在 multiprocessing docs 中找到有关这些资源的更多信息。