Python 并行保存到 netCDF4 文件的多处理
Python multiprocessing with parallel save to netCDF4 file
我正在执行一个模拟,我想在其中保存状态向量的快照,并针对不同的参数进行计算。我有两个要扫描的控制参数(下面示例中的 p 和 a)。因此,我将仿真结果保存为一个 netCDF4 文件,其中两个维度用于两个控制参数。当我 运行 对一个参数设置的模拟正确保存文件时,但是当我尝试从 multiprocessing
运行 apply_async
时,过程结束时的 netCDF4 无法访问.
我的完整代码在这里 github repository,但基本上我想做的是:
import multiprocessing as mp
import time as timer
import netCDF4
import numpy as np
def run_sim_for_p_a(p,a,pstep,astep,step,max_time,u0,fname):
time_ar=np.arange(0,max_time,step)
u = np.ones((len(time_ar),1024))
u[0]=u0
print "Calculating for p,a:",p,a
for i,t in enumerate(time_ar[1:]):
u[i+1] = u[i]*np.cos(t)*np.sin(a)*np.sin(p)
for tstep,t in enumerate(time_ar):
save_p_a_snapshot(fname,pstep,astep,tstep,p,a,t,u[tstep]) # save the results into the netCDF4 file
def apply_async_and_save_grid(pmin,pmax,fname,
Np=10,Na=10,
step=None,max_time=500.0,numproc=10):
start = timer.time()
setup_p_a_scan(fname) # Setup a netCDF4 file for the simulations
if step is None:
step=max_time
p_range = np.linspace(pmin,pmax,Np)
init = np.random.random((1024))
a_range = np.linspace(0,1,Na)
availble_cpus = int(available_cpu_count() - 2)
numproc=min(numproc,availble_cpus)
print "Using",numproc," processors"
pool = mp.Pool(processes=numproc)
for i,p in enumerate(p_range):
for j,a in enumerate(a_range):
pool.apply_async(run_sim_for_p_a,
args = (p,a,i,j,step,max_time,init,fname))
pool.close()
pool.join()
print "Took ",timer.time()-start
if __name__=="__main__":
apply_async_and_save_grid(1.0,2.0,"test",Np=2,Na=4,step=1.0,max_time=10)
至少有两种可能的方法:
您可以让每个工作进程将其结果写入自己的 netCDF4 文件,并让主程序在所有工作进程完成后合并它们。
我不熟悉 netCDF 格式,但假设可以附加到此类文件,另一种可能性是在开始 apply_async
之前创建一个 multiprocessing.Lock
。 这个锁应该添加到工作进程的参数中。工作进程应该 acquire
锁定,打开 netcdf 文件,写入并关闭它。然后它应该 release
锁定。这将确保一次只有一个进程写入 netCDF 文件。
编辑:
请参阅 this question 的答案,了解如何使用 Pool
处理 Lock
。
我正在执行一个模拟,我想在其中保存状态向量的快照,并针对不同的参数进行计算。我有两个要扫描的控制参数(下面示例中的 p 和 a)。因此,我将仿真结果保存为一个 netCDF4 文件,其中两个维度用于两个控制参数。当我 运行 对一个参数设置的模拟正确保存文件时,但是当我尝试从 multiprocessing
运行 apply_async
时,过程结束时的 netCDF4 无法访问.
我的完整代码在这里 github repository,但基本上我想做的是:
import multiprocessing as mp
import time as timer
import netCDF4
import numpy as np
def run_sim_for_p_a(p,a,pstep,astep,step,max_time,u0,fname):
time_ar=np.arange(0,max_time,step)
u = np.ones((len(time_ar),1024))
u[0]=u0
print "Calculating for p,a:",p,a
for i,t in enumerate(time_ar[1:]):
u[i+1] = u[i]*np.cos(t)*np.sin(a)*np.sin(p)
for tstep,t in enumerate(time_ar):
save_p_a_snapshot(fname,pstep,astep,tstep,p,a,t,u[tstep]) # save the results into the netCDF4 file
def apply_async_and_save_grid(pmin,pmax,fname,
Np=10,Na=10,
step=None,max_time=500.0,numproc=10):
start = timer.time()
setup_p_a_scan(fname) # Setup a netCDF4 file for the simulations
if step is None:
step=max_time
p_range = np.linspace(pmin,pmax,Np)
init = np.random.random((1024))
a_range = np.linspace(0,1,Na)
availble_cpus = int(available_cpu_count() - 2)
numproc=min(numproc,availble_cpus)
print "Using",numproc," processors"
pool = mp.Pool(processes=numproc)
for i,p in enumerate(p_range):
for j,a in enumerate(a_range):
pool.apply_async(run_sim_for_p_a,
args = (p,a,i,j,step,max_time,init,fname))
pool.close()
pool.join()
print "Took ",timer.time()-start
if __name__=="__main__":
apply_async_and_save_grid(1.0,2.0,"test",Np=2,Na=4,step=1.0,max_time=10)
至少有两种可能的方法:
您可以让每个工作进程将其结果写入自己的 netCDF4 文件,并让主程序在所有工作进程完成后合并它们。
我不熟悉 netCDF 格式,但假设可以附加到此类文件,另一种可能性是在开始 apply_async
之前创建一个 multiprocessing.Lock
。 这个锁应该添加到工作进程的参数中。工作进程应该 acquire
锁定,打开 netcdf 文件,写入并关闭它。然后它应该 release
锁定。这将确保一次只有一个进程写入 netCDF 文件。
编辑:
请参阅 this question 的答案,了解如何使用 Pool
处理 Lock
。