分别改变并行进程中的不同 python 个对象

Altering different python objects in parallel processes, respectively

一言以蔽之

我想同时更改复杂的 python 对象,每个对象仅由一个进程处理。我该怎么做(最有效)?实施某种酸洗支持会有帮助吗?这样有效率吗?

全题

我有一个 python 数据结构 ArrayDict,它基本上由一个 numpy 数组和一个字典组成,并将任意索引映射到数组中的行。在我的例子中,所有键都是整数。

a = ArrayDict()

a[1234] = 12.5
a[10] = 3

print(a[1234])                               #12.5
print(a[10])                                 # 3.0

print(a[1234] == a.array[a.indexDict[1234]]) #true

现在我有多个这样的ArrayDict,想把它们填入myMethod(arrayDict, params)。由于 myMethod 很昂贵,我想 运行 并行。请注意 myMethod 可能会向 arrayDict 添加许多行。每个进程都会改变自己的 ArrayDict。我不需要并发访问 ArrayDicts.

myMethod 中,我更改了 arrayDict 中的条目(即,我更改了内部 numpy 数组),我将条目添加到 arrayDict(即是,我向字典添加另一个索引并在内部数组中写入一个新值)。最终,我希望能够在 arrayDict 的内部 numpy 数组变得太小时交换它。这种情况不会经常发生,如果没有更好的解决方案,我可以在程序的非并行部分执行此操作。即使没有数组交换,我自己的尝试也没有成功。

我花了几天时间研究共享内存和 python 的 multiprocessing module. Since I will finally be working on linux, the task seemed to be rather simple: the system call fork() allows to work with copies of the arguments efficiently. My thought was then to change each ArrayDict in its own process, return the changed version of the object, and overwrite the original object. To save memory and save the work for copying, I used in addition sharedmem 数组以将数据存储在 ArrayDict 中。我知道字典还是要抄的。

from sharedmem import sharedmem
import numpy as np

n = ...                   # length of the data array
myData = np.empty(n, dtype=object)
myData[:] = [ArrayDict() for _ in range(n)]
done = False

while not done:
    consideredData = ...  # numpy boolean array of length
                          # n with True at the index of
                          # considered data
    args = ...            # numpy array containing arguments
                          # for myMethod

    with sharedmem.MapReduce() as pool:
        results = pool.map(myMethod, 
                           list(zip(myData[considered], 
                                    args[considered])),
                           star=True)
        myData[considered] = results

    done = ...             # depends on what happens in
                           # myMethod

我得到的是分段错误。我能够通过创建 ArrayDictmyMethod 的深层副本并将它们保存到 myData 来避免这个错误。我真的不明白为什么这是必要的,并且频繁地复制我的(可能非常大的)数组(while 循环需要很长时间)对我来说似乎不是有效的。但是,至少它在一定程度上起到了作用。尽管如此,由于共享内存,我的程序在第 3 次迭代时有一些错误行为。因此,我认为我的方法不是最优的。

我读到 here and here 可以使用 multiprocessing.Array 在共享内存上保存任意 numpy 数组。但是,我仍然需要共享整个 ArrayDict,其中特别包括一本字典,而这又是不可挑选的。

我怎样才能有效地实现我的目标?是否有可能(并且有效)以某种方式使我的对象可拾取?

所有解决方案必须 运行 python 3 和完全 numpy/scipy 支持 64 位 Linux。

编辑

我发现 here 可以使用多处理 "Manager" 类 和用户定义的代理 类 以某种方式共享任意对象。这会有效率吗?我想利用我不需要并发访问对象,即使它们没有在主进程中处理。是否可以为我要处理的每个对象创建一个管理器? (我可能对经理的工作方式还有一些误解。)

这似乎相当复杂 class,我无法完全预测此解决方案是否适用于您的情况。这种复杂的 class 的简单折衷是使用 ProcessPoolExecutor.

如果这不能回答您的问题,那么最好使用一个最小的、有效的示例。

from concurrent.futures import ProcessPoolExecutor

import numpy as np

class ArrayDict ():
  keys = None
  vals = None

  def __init__ (self):
    self.keys = dict ()
    self.vals = np.random.rand (1000)

  def __str__ (self):
    return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())

def myMethod (ad, args):
  print ("starting:", ad)


if __name__ == '__main__':
  l     = [ArrayDict() for _ in range (5)]
  args  = [2, 3, 4, 1, 3]

  with ProcessPoolExecutor (max_workers = 2) as ex:

    d = ex.map (myMethod, l, args)

对象 cloned 发送到子进程时,您需要 return 结果(因为对对象的更改不会传播回主进程)并按照您的意愿进行处理存储它们。

Note that changes to class variables will propagate to other objects in the same process, e.g. if you have more tasks than processes, changes to class variables will be shared among the instances running in the same process. This is usually undesired behavior.

这是并行化的高级接口。 ProcessPoolExecutor 使用 multiprocessing 模块并且只能与 pickable objects. I suspect that ProcessPoolExecutor has performance similar to "sharing state between processes". Under the hood, ProcessPoolExecutor is using multiprocessing.Process, and should exhibit similar performance as Pool (except when using very long iterables 配合使用 map)。 ProcessPoolExecutor 似乎是 python 中并发任务的预期未来 API。

如果可以的话,通常使用 ThreadPoolExecutor 会更快(可以换成 ProcessPoolExecutor)。在这种情况下,对象 在进程之间共享 ,对一个对象的更新将传播回主线程​​。

如前所述,最快的选择可能是重新构造 ArrayDict,以便它仅使用可以由 multiprocessing.ValueArray.

表示的对象

如果 ProcessPoolExecutor 不起作用,并且您无法优化 ArrayDict,您可能无法使用 Manager. There are good examples on how to do that here

The greatest performance gain is often likely to be found in myMethod. And, as I mentioned, the overhead of using threads is less than that of processes.