并行和线性嵌套 for 循环之间的不匹配
Mismatch between parallelized and linear nested for loops
我想并行化类似于以下内容的一段代码:
Ngal=10
sampind=[7,16,22,31,45]
samples=0.3*np.ones((60,Ngal))
zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
toavg=[]
for j in range(Ngal):
gal=[]
for m in sampind:
gal.append(samples[m][j]-zt[j])
toavg.append(np.mean(gal))
accuracy=np.mean(toavg)
所以我听从了建议here并重写如下:
toavg=[]
gal=[]
p = mp.Pool()
def deltaz(params):
j=params[0] # index of the galaxy
m=params[1] # indices for which we have sampled redshifts
gal.append(samples[m][j]-zt[j])
return np.mean(gal)
j=(np.linspace(0,Ngal-1,Ngal).astype(int))
m=sampind
grid=[j,m]
input=itertools.product(*grid)
results = p.map(deltaz,input)
accuracy=np.mean(results)
p.close()
p.join()
但结果不一样。事实上,有时是,有时不是。它似乎不是很确定。我的方法正确吗?如果没有,我应该修复什么?谢谢!重现上述示例所需的模块是:
import numpy as np
import multiprocess as mp
import itertools
谢谢!
我看到的第一个问题是您正在创建一个全局变量 gal,函数 deltaz 正在访问该变量。然而,这些不在池进程之间共享,而是为每个进程单独实例化。如果你想让他们共享这个结构,你将不得不使用共享内存。这可能就是您看到不确定行为的原因。
下一个问题是您实际上并没有用不同的变体完成相同的任务。您要取每组平均值 (gal) 的平均值的第一个。并行的是对恰好出现在该列表中的所有元素取平均值。这是不确定的,因为项目在可用时分配给流程,这不一定是可预测的。
我建议将内循环并行化。为此,您需要将 zt 和 samples 都放在共享内存中,因为它们会被所有进程访问。如果您正在修改数据,这可能会很危险,但由于您似乎只是在阅读,所以应该没问题。
import numpy as np
import multiprocessing as mp
import itertools
import ctypes
#Non-parallel code
Ngal=10
sampind=[7,16,22,31,45]
samples=0.3*np.ones((60,Ngal))
zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
#Nonparallel
toavg=[]
for j in range(Ngal):
gal=[]
for m in sampind:
gal.append(samples[m][j]-zt[j])
toavg.append(np.mean(gal))
accuracy=np.mean(toavg)
print(toavg)
# Parallel function
def deltaz(j):
sampind=[7,16,22,31,45]
gal = []
for m in sampind:
gal.append(samples[m][j]-zt[j])
return np.mean(gal)
# Shared array for zt
zt_base = mp.Array(ctypes.c_double, int(len(zt)),lock=False)
ztArr = np.ctypeslib.as_array(zt_base)
#Shared array for samples
sample_base = mp.Array(ctypes.c_double, int(np.product(samples.shape)),lock=False)
sampArr = np.ctypeslib.as_array(sample_base)
sampArr = sampArr.reshape(samples.shape)
#Copy arrays to shared
sampArr[:,:] = samples[:,:]
ztArr[:] = zt[:]
with mp.Pool() as p:
result = p.map(deltaz,(np.linspace(0,Ngal-1,Ngal).astype(int)))
print(result)
这是一个产生相同结果的例子。您可以根据需要为此增加更多的复杂性,但我会阅读一般的多处理和内存 types/scopes 以了解什么会起作用,什么不会起作用。当您进入多处理世界时,您必须更加小心。如果这没有帮助,请告诉我,我会尝试对其进行更新以使其有效。
我想并行化类似于以下内容的一段代码:
Ngal=10
sampind=[7,16,22,31,45]
samples=0.3*np.ones((60,Ngal))
zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
toavg=[]
for j in range(Ngal):
gal=[]
for m in sampind:
gal.append(samples[m][j]-zt[j])
toavg.append(np.mean(gal))
accuracy=np.mean(toavg)
所以我听从了建议here并重写如下:
toavg=[]
gal=[]
p = mp.Pool()
def deltaz(params):
j=params[0] # index of the galaxy
m=params[1] # indices for which we have sampled redshifts
gal.append(samples[m][j]-zt[j])
return np.mean(gal)
j=(np.linspace(0,Ngal-1,Ngal).astype(int))
m=sampind
grid=[j,m]
input=itertools.product(*grid)
results = p.map(deltaz,input)
accuracy=np.mean(results)
p.close()
p.join()
但结果不一样。事实上,有时是,有时不是。它似乎不是很确定。我的方法正确吗?如果没有,我应该修复什么?谢谢!重现上述示例所需的模块是:
import numpy as np
import multiprocess as mp
import itertools
谢谢!
我看到的第一个问题是您正在创建一个全局变量 gal,函数 deltaz 正在访问该变量。然而,这些不在池进程之间共享,而是为每个进程单独实例化。如果你想让他们共享这个结构,你将不得不使用共享内存。这可能就是您看到不确定行为的原因。
下一个问题是您实际上并没有用不同的变体完成相同的任务。您要取每组平均值 (gal) 的平均值的第一个。并行的是对恰好出现在该列表中的所有元素取平均值。这是不确定的,因为项目在可用时分配给流程,这不一定是可预测的。
我建议将内循环并行化。为此,您需要将 zt 和 samples 都放在共享内存中,因为它们会被所有进程访问。如果您正在修改数据,这可能会很危险,但由于您似乎只是在阅读,所以应该没问题。
import numpy as np
import multiprocessing as mp
import itertools
import ctypes
#Non-parallel code
Ngal=10
sampind=[7,16,22,31,45]
samples=0.3*np.ones((60,Ngal))
zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
#Nonparallel
toavg=[]
for j in range(Ngal):
gal=[]
for m in sampind:
gal.append(samples[m][j]-zt[j])
toavg.append(np.mean(gal))
accuracy=np.mean(toavg)
print(toavg)
# Parallel function
def deltaz(j):
sampind=[7,16,22,31,45]
gal = []
for m in sampind:
gal.append(samples[m][j]-zt[j])
return np.mean(gal)
# Shared array for zt
zt_base = mp.Array(ctypes.c_double, int(len(zt)),lock=False)
ztArr = np.ctypeslib.as_array(zt_base)
#Shared array for samples
sample_base = mp.Array(ctypes.c_double, int(np.product(samples.shape)),lock=False)
sampArr = np.ctypeslib.as_array(sample_base)
sampArr = sampArr.reshape(samples.shape)
#Copy arrays to shared
sampArr[:,:] = samples[:,:]
ztArr[:] = zt[:]
with mp.Pool() as p:
result = p.map(deltaz,(np.linspace(0,Ngal-1,Ngal).astype(int)))
print(result)
这是一个产生相同结果的例子。您可以根据需要为此增加更多的复杂性,但我会阅读一般的多处理和内存 types/scopes 以了解什么会起作用,什么不会起作用。当您进入多处理世界时,您必须更加小心。如果这没有帮助,请告诉我,我会尝试对其进行更新以使其有效。