Python 多处理:当我向 ProcessPoolExecutor 提交对象列表时,提交的是副本还是参考?
Python Multiprocesssing: When i submit a list of objects to ProcessPoolExecutor, is a copy or a reference submitted?
我尝试并行化一个巨大项目的一部分。我得到了一个由点组成的点阵,并在点阵的每个单独点上进行了计算。为了加快计算速度,我想将格子的点细分为不同的子列表,并通过 ProcessPoolExecutor 对各个进程执行计算。但是,如果我将列表传递给 ProcessPoolExecutor,它似乎复制了列表并且没有传递任何引用。
我的代码的最小版本如下:
from time import time
from concurrent.futures import ProcessPoolExecutor
class Lattice():
def __init__(self, rndl):
self.Points = []
for val in rndl:
P = Point(val)
self.Points.append(P)
class Point():
def __init__(self, val):
self.val = val
def calculate(self, it = 100):
for i in range(it):
some_tmp_value = self.val**(1.0/10)
some_tmp_value = some_tmp_value**(1.0/10)
self.val -= 1 #update values
def do_calculation_ser_2(lattice, it = 100):
workers = 2
splitted_list = [lattice.Points[i::workers] for i in range(workers)]
for sublist in splitted_list:
for P in sublist:
P.calculate(it)
def par_calc_helper(sublist, it):
for i,P in enumerate(sublist):
P.calculate(it)
return None
def do_calculation_par_partitioning(lattice, it):
workers = 2 #define number of subprocesses
#split list in chunks
splitted_list = [lattice.Points[i::workers] for i in range(workers)]
with ProcessPoolExecutor(max_workers = workers) as executor:
for i, sublist in enumerate(splitted_list):
future = executor.submit(par_calc_helper, sublist, it)
def check_calc(lattice, rndl):
for i,(P, val_rnd) in enumerate(zip(lattice.Points, rndl)):
error = False
P.val += 1
if P.val != val_rnd:
print("ERROR - Calulation gone wrong")
error = True
break
if not error:
print("Calculation gone right")
if __name__ == '__main__':
max_iter = 2*8
rndl = [4,4,4,4]
lat = Lattice(rndl)
do_calculation_ser_2(lat, max_iter)
check_calc(lat, rndl)
do_calculation_par_partitioning(lat, max_iter)
check_calc(lat, rndl)
输出:
Calculation gone right
ERROR - Calulation gone wrong
格子class只是所有点的容器。 class 点只有一个值和一个计算方法。 calculate方法里面的for循环只是执行函数需要一些时间。结果是点值减 1,所以我可以很容易地检查稍后是否正确完成了计算。
那么do_calculation_ser_2方法就是计算的连载版。我将点列表拆分为多个子列表并遍历它们并执行计算。我知道拆分在这里没有用,但我想保持它与并行版本相似,这样我可以更好地解决错误。
do_calculation_par_partitioning方法用于并行化我的计算。首先,我将格子的点细分为子列表。然后我使用辅助函数迭代子列表并将辅助函数与子列表一起传递到我的 ProcessPoolExecutor。
最后,check_calc函数用于检查计算是否正确,并将点值再次递增1以获得之前的点阵。
作为测试,我只初始化4个值为4的点并进行计算。当我尝试 运行 它时,串行版本工作得很好。但是,并行版本不适用于积分。计算方法被正确调用(我可以看到,如果我在方法中插入打印语句),但结果值设置不正确,保持 4 而不是预期的 3。
我假设将对象列表传递给 ProcessPoolExecutor 将复制列表中的对象,而不是仅仅引用它们(如在串行版本中)。是这样吗?如果是,我如何传递对象列表而不每次都复制它们? (对于非常大的计算来说会很糟糕)。
传递列表副本并用计算结果替换主列表效果最好还是有更好的方法?
如果我的方法完全错误,并且您在 Python 中有更好的想法,请告诉我。
你在进程边界上敲你的头......更严重的是,ProcessPoolExecutor 启动了一些工作进程进程,序列化将传递给它们的值并通过管道中的序列化值。然后每个worker在自己的内存区工作,master进程数据无法更新
一种可能的方法是每个工作人员 return 都有其修改后的列表(将再次序列化)并且父级将它们连接起来。或者,多处理模块使用共享内存提供共享类型,但它是以额外同步为代价的。在您的用例中,我可能只是 return 修改后的列表。
我尝试并行化一个巨大项目的一部分。我得到了一个由点组成的点阵,并在点阵的每个单独点上进行了计算。为了加快计算速度,我想将格子的点细分为不同的子列表,并通过 ProcessPoolExecutor 对各个进程执行计算。但是,如果我将列表传递给 ProcessPoolExecutor,它似乎复制了列表并且没有传递任何引用。
我的代码的最小版本如下:
from time import time
from concurrent.futures import ProcessPoolExecutor
class Lattice():
def __init__(self, rndl):
self.Points = []
for val in rndl:
P = Point(val)
self.Points.append(P)
class Point():
def __init__(self, val):
self.val = val
def calculate(self, it = 100):
for i in range(it):
some_tmp_value = self.val**(1.0/10)
some_tmp_value = some_tmp_value**(1.0/10)
self.val -= 1 #update values
def do_calculation_ser_2(lattice, it = 100):
workers = 2
splitted_list = [lattice.Points[i::workers] for i in range(workers)]
for sublist in splitted_list:
for P in sublist:
P.calculate(it)
def par_calc_helper(sublist, it):
for i,P in enumerate(sublist):
P.calculate(it)
return None
def do_calculation_par_partitioning(lattice, it):
workers = 2 #define number of subprocesses
#split list in chunks
splitted_list = [lattice.Points[i::workers] for i in range(workers)]
with ProcessPoolExecutor(max_workers = workers) as executor:
for i, sublist in enumerate(splitted_list):
future = executor.submit(par_calc_helper, sublist, it)
def check_calc(lattice, rndl):
for i,(P, val_rnd) in enumerate(zip(lattice.Points, rndl)):
error = False
P.val += 1
if P.val != val_rnd:
print("ERROR - Calulation gone wrong")
error = True
break
if not error:
print("Calculation gone right")
if __name__ == '__main__':
max_iter = 2*8
rndl = [4,4,4,4]
lat = Lattice(rndl)
do_calculation_ser_2(lat, max_iter)
check_calc(lat, rndl)
do_calculation_par_partitioning(lat, max_iter)
check_calc(lat, rndl)
输出:
Calculation gone right
ERROR - Calulation gone wrong
格子class只是所有点的容器。 class 点只有一个值和一个计算方法。 calculate方法里面的for循环只是执行函数需要一些时间。结果是点值减 1,所以我可以很容易地检查稍后是否正确完成了计算。
那么do_calculation_ser_2方法就是计算的连载版。我将点列表拆分为多个子列表并遍历它们并执行计算。我知道拆分在这里没有用,但我想保持它与并行版本相似,这样我可以更好地解决错误。
do_calculation_par_partitioning方法用于并行化我的计算。首先,我将格子的点细分为子列表。然后我使用辅助函数迭代子列表并将辅助函数与子列表一起传递到我的 ProcessPoolExecutor。
最后,check_calc函数用于检查计算是否正确,并将点值再次递增1以获得之前的点阵。
作为测试,我只初始化4个值为4的点并进行计算。当我尝试 运行 它时,串行版本工作得很好。但是,并行版本不适用于积分。计算方法被正确调用(我可以看到,如果我在方法中插入打印语句),但结果值设置不正确,保持 4 而不是预期的 3。
我假设将对象列表传递给 ProcessPoolExecutor 将复制列表中的对象,而不是仅仅引用它们(如在串行版本中)。是这样吗?如果是,我如何传递对象列表而不每次都复制它们? (对于非常大的计算来说会很糟糕)。 传递列表副本并用计算结果替换主列表效果最好还是有更好的方法?
如果我的方法完全错误,并且您在 Python 中有更好的想法,请告诉我。
你在进程边界上敲你的头......更严重的是,ProcessPoolExecutor 启动了一些工作进程进程,序列化将传递给它们的值并通过管道中的序列化值。然后每个worker在自己的内存区工作,master进程数据无法更新
一种可能的方法是每个工作人员 return 都有其修改后的列表(将再次序列化)并且父级将它们连接起来。或者,多处理模块使用共享内存提供共享类型,但它是以额外同步为代价的。在您的用例中,我可能只是 return 修改后的列表。