分别改变并行进程中的不同 python 个对象
Altering different python objects in parallel processes, respectively
一言以蔽之
我想同时更改复杂的 python 对象,每个对象仅由一个进程处理。我该怎么做(最有效)?实施某种酸洗支持会有帮助吗?这样有效率吗?
全题
我有一个 python 数据结构 ArrayDict
,它基本上由一个 numpy
数组和一个字典组成,并将任意索引映射到数组中的行。在我的例子中,所有键都是整数。
a = ArrayDict()
a[1234] = 12.5
a[10] = 3
print(a[1234]) #12.5
print(a[10]) # 3.0
print(a[1234] == a.array[a.indexDict[1234]]) #true
现在我有多个这样的ArrayDict
,想把它们填入myMethod(arrayDict, params)
。由于 myMethod
很昂贵,我想 运行 并行。请注意 myMethod
可能会向 arrayDict
添加许多行。每个进程都会改变自己的 ArrayDict
。我不需要并发访问 ArrayDict
s.
在 myMethod
中,我更改了 arrayDict
中的条目(即,我更改了内部 numpy
数组),我将条目添加到 arrayDict
(即是,我向字典添加另一个索引并在内部数组中写入一个新值)。最终,我希望能够在 arrayDict
的内部 numpy
数组变得太小时交换它。这种情况不会经常发生,如果没有更好的解决方案,我可以在程序的非并行部分执行此操作。即使没有数组交换,我自己的尝试也没有成功。
我花了几天时间研究共享内存和 python 的 multiprocessing module. Since I will finally be working on linux, the task seemed to be rather simple: the system call fork()
allows to work with copies of the arguments efficiently. My thought was then to change each ArrayDict
in its own process, return the changed version of the object, and overwrite the original object. To save memory and save the work for copying, I used in addition sharedmem 数组以将数据存储在 ArrayDict
中。我知道字典还是要抄的。
from sharedmem import sharedmem
import numpy as np
n = ... # length of the data array
myData = np.empty(n, dtype=object)
myData[:] = [ArrayDict() for _ in range(n)]
done = False
while not done:
consideredData = ... # numpy boolean array of length
# n with True at the index of
# considered data
args = ... # numpy array containing arguments
# for myMethod
with sharedmem.MapReduce() as pool:
results = pool.map(myMethod,
list(zip(myData[considered],
args[considered])),
star=True)
myData[considered] = results
done = ... # depends on what happens in
# myMethod
我得到的是分段错误。我能够通过创建 ArrayDict
到 myMethod
的深层副本并将它们保存到 myData
来避免这个错误。我真的不明白为什么这是必要的,并且频繁地复制我的(可能非常大的)数组(while 循环需要很长时间)对我来说似乎不是有效的。但是,至少它在一定程度上起到了作用。尽管如此,由于共享内存,我的程序在第 3 次迭代时有一些错误行为。因此,我认为我的方法不是最优的。
我读到 here and here 可以使用 multiprocessing.Array
在共享内存上保存任意 numpy 数组。但是,我仍然需要共享整个 ArrayDict
,其中特别包括一本字典,而这又是不可挑选的。
我怎样才能有效地实现我的目标?是否有可能(并且有效)以某种方式使我的对象可拾取?
所有解决方案必须 运行 python 3 和完全 numpy/scipy 支持 64 位 Linux。
编辑
我发现 here 可以使用多处理 "Manager" 类 和用户定义的代理 类 以某种方式共享任意对象。这会有效率吗?我想利用我不需要并发访问对象,即使它们没有在主进程中处理。是否可以为我要处理的每个对象创建一个管理器? (我可能对经理的工作方式还有一些误解。)
这似乎相当复杂 class,我无法完全预测此解决方案是否适用于您的情况。这种复杂的 class 的简单折衷是使用 ProcessPoolExecutor
.
如果这不能回答您的问题,那么最好使用一个最小的、有效的示例。
from concurrent.futures import ProcessPoolExecutor
import numpy as np
class ArrayDict ():
keys = None
vals = None
def __init__ (self):
self.keys = dict ()
self.vals = np.random.rand (1000)
def __str__ (self):
return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())
def myMethod (ad, args):
print ("starting:", ad)
if __name__ == '__main__':
l = [ArrayDict() for _ in range (5)]
args = [2, 3, 4, 1, 3]
with ProcessPoolExecutor (max_workers = 2) as ex:
d = ex.map (myMethod, l, args)
对象 cloned 发送到子进程时,您需要 return 结果(因为对对象的更改不会传播回主进程)并按照您的意愿进行处理存储它们。
Note that changes to class variables will propagate to other objects in the
same process, e.g. if you have more tasks than processes, changes to class
variables will be shared among the instances running in the same process. This is usually undesired behavior.
这是并行化的高级接口。 ProcessPoolExecutor
使用 multiprocessing
模块并且只能与 pickable objects. I suspect that ProcessPoolExecutor
has performance similar to "sharing state between processes". Under the hood, ProcessPoolExecutor
is using multiprocessing.Process
, and should exhibit similar performance as Pool
(except when using very long iterables 配合使用 map)。 ProcessPoolExecutor
似乎是 python 中并发任务的预期未来 API。
如果可以的话,通常使用 ThreadPoolExecutor
会更快(可以换成 ProcessPoolExecutor
)。在这种情况下,对象 在进程之间共享 ,对一个对象的更新将传播回主线程。
如前所述,最快的选择可能是重新构造 ArrayDict
,以便它仅使用可以由 multiprocessing.Value
或 Array
.
表示的对象
如果 ProcessPoolExecutor
不起作用,并且您无法优化 ArrayDict
,您可能无法使用 Manager
. There are good examples on how to do that here。
The greatest performance gain is often likely to be found in myMethod
.
And, as I mentioned, the overhead of using threads is less than that of processes.
一言以蔽之
我想同时更改复杂的 python 对象,每个对象仅由一个进程处理。我该怎么做(最有效)?实施某种酸洗支持会有帮助吗?这样有效率吗?
全题
我有一个 python 数据结构 ArrayDict
,它基本上由一个 numpy
数组和一个字典组成,并将任意索引映射到数组中的行。在我的例子中,所有键都是整数。
a = ArrayDict()
a[1234] = 12.5
a[10] = 3
print(a[1234]) #12.5
print(a[10]) # 3.0
print(a[1234] == a.array[a.indexDict[1234]]) #true
现在我有多个这样的ArrayDict
,想把它们填入myMethod(arrayDict, params)
。由于 myMethod
很昂贵,我想 运行 并行。请注意 myMethod
可能会向 arrayDict
添加许多行。每个进程都会改变自己的 ArrayDict
。我不需要并发访问 ArrayDict
s.
在 myMethod
中,我更改了 arrayDict
中的条目(即,我更改了内部 numpy
数组),我将条目添加到 arrayDict
(即是,我向字典添加另一个索引并在内部数组中写入一个新值)。最终,我希望能够在 arrayDict
的内部 numpy
数组变得太小时交换它。这种情况不会经常发生,如果没有更好的解决方案,我可以在程序的非并行部分执行此操作。即使没有数组交换,我自己的尝试也没有成功。
我花了几天时间研究共享内存和 python 的 multiprocessing module. Since I will finally be working on linux, the task seemed to be rather simple: the system call fork()
allows to work with copies of the arguments efficiently. My thought was then to change each ArrayDict
in its own process, return the changed version of the object, and overwrite the original object. To save memory and save the work for copying, I used in addition sharedmem 数组以将数据存储在 ArrayDict
中。我知道字典还是要抄的。
from sharedmem import sharedmem
import numpy as np
n = ... # length of the data array
myData = np.empty(n, dtype=object)
myData[:] = [ArrayDict() for _ in range(n)]
done = False
while not done:
consideredData = ... # numpy boolean array of length
# n with True at the index of
# considered data
args = ... # numpy array containing arguments
# for myMethod
with sharedmem.MapReduce() as pool:
results = pool.map(myMethod,
list(zip(myData[considered],
args[considered])),
star=True)
myData[considered] = results
done = ... # depends on what happens in
# myMethod
我得到的是分段错误。我能够通过创建 ArrayDict
到 myMethod
的深层副本并将它们保存到 myData
来避免这个错误。我真的不明白为什么这是必要的,并且频繁地复制我的(可能非常大的)数组(while 循环需要很长时间)对我来说似乎不是有效的。但是,至少它在一定程度上起到了作用。尽管如此,由于共享内存,我的程序在第 3 次迭代时有一些错误行为。因此,我认为我的方法不是最优的。
我读到 here and here 可以使用 multiprocessing.Array
在共享内存上保存任意 numpy 数组。但是,我仍然需要共享整个 ArrayDict
,其中特别包括一本字典,而这又是不可挑选的。
我怎样才能有效地实现我的目标?是否有可能(并且有效)以某种方式使我的对象可拾取?
所有解决方案必须 运行 python 3 和完全 numpy/scipy 支持 64 位 Linux。
编辑
我发现 here 可以使用多处理 "Manager" 类 和用户定义的代理 类 以某种方式共享任意对象。这会有效率吗?我想利用我不需要并发访问对象,即使它们没有在主进程中处理。是否可以为我要处理的每个对象创建一个管理器? (我可能对经理的工作方式还有一些误解。)
这似乎相当复杂 class,我无法完全预测此解决方案是否适用于您的情况。这种复杂的 class 的简单折衷是使用 ProcessPoolExecutor
.
如果这不能回答您的问题,那么最好使用一个最小的、有效的示例。
from concurrent.futures import ProcessPoolExecutor
import numpy as np
class ArrayDict ():
keys = None
vals = None
def __init__ (self):
self.keys = dict ()
self.vals = np.random.rand (1000)
def __str__ (self):
return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())
def myMethod (ad, args):
print ("starting:", ad)
if __name__ == '__main__':
l = [ArrayDict() for _ in range (5)]
args = [2, 3, 4, 1, 3]
with ProcessPoolExecutor (max_workers = 2) as ex:
d = ex.map (myMethod, l, args)
对象 cloned 发送到子进程时,您需要 return 结果(因为对对象的更改不会传播回主进程)并按照您的意愿进行处理存储它们。
Note that changes to class variables will propagate to other objects in the same process, e.g. if you have more tasks than processes, changes to class variables will be shared among the instances running in the same process. This is usually undesired behavior.
这是并行化的高级接口。 ProcessPoolExecutor
使用 multiprocessing
模块并且只能与 pickable objects. I suspect that ProcessPoolExecutor
has performance similar to "sharing state between processes". Under the hood, ProcessPoolExecutor
is using multiprocessing.Process
, and should exhibit similar performance as Pool
(except when using very long iterables 配合使用 map)。 ProcessPoolExecutor
似乎是 python 中并发任务的预期未来 API。
如果可以的话,通常使用 ThreadPoolExecutor
会更快(可以换成 ProcessPoolExecutor
)。在这种情况下,对象 在进程之间共享 ,对一个对象的更新将传播回主线程。
如前所述,最快的选择可能是重新构造 ArrayDict
,以便它仅使用可以由 multiprocessing.Value
或 Array
.
如果 ProcessPoolExecutor
不起作用,并且您无法优化 ArrayDict
,您可能无法使用 Manager
. There are good examples on how to do that here。
The greatest performance gain is often likely to be found in
myMethod
. And, as I mentioned, the overhead of using threads is less than that of processes.