在多处理中使用共享数组来保存值

Using a Shared Array in multiprocessing to save values

我正在尝试让 python 多处理工作来加速我编写的代码。代码如下所示:

from multiprocessing import Array, Pool
import numpy as np
#setting up shared memory array
global misfit
misfit = Array('d', np.empty((dim1,dim2,dim3,dim4)).flat)

#looping through some values
for i in xrange(0,1):
     #setting up pool
     pool = Pool()
     p = [pool.apply_async(self.testfunc,args=(somevals,j)) for j in xrange(0,1)]
     pool.close()
     pool.join()

其中 self.testfunc 看起来像:

 def testfunc(self,somevals,j):
      #some calculations
      for k in xrange(0,1):
           #some calculations
           for mn in xrange(0,1):
                 #some more calculations
                 #save results
                 result = i*j*k*mn # example
                 misfit[i*j*k*mn] = result

我的问题是,当我 运行 这个 none 的值保存在共享数组中时,它仍然是空的。我知道这可能与全局变量有关,但在使用此确切设置的更简单的程序中,值将保存到数组中。该数组在完整程序中也非常大(4561920000 个值)。此外,如果我在池外调用此函数,它会工作并且值会被保存。

所以我的问题是我在这里做错了什么?我是否错误地发送了共享数组?

编辑:我想我会添加有效的代码:

from multiprocessing import Array, Pool
from numpy import empty, sin
from time import time
import numpy as np

def initarr():
  a = Array('d', empty((5, 50, 80)).flat)
  return a

def testfunc(i, j, k):
  count = (i*50*80) + (j*80) + k
  x = sin(k)
  a[count] = x
  y = np.fft.fft(np.exp(2j*np.pi*np.arange(50000)/50000))


def process(i):
  start = time()
  pool = Pool()
  for j in xrange(0, 50):
    p = [pool.apply_async(testfunc, args=(i, j, k)) for k in xrange(0, 80)]
  pool.close()
  pool.join()
  print time() - start


global a
a = initarr()

for i in xrange(0, 5):
   process(i)

好的,在我们 IT 部门人员的帮助下,我终于有了一个可用的版本,所以对于将来看到这个问题的任何人,我都会 post 一个解决方案。我还没有真正使用过堆栈溢出,如果回答我自己的问题是不礼貌的,我很抱歉。

我们使用初始化函数实现了此功能,但我们必须确保初始化函数位于 与 运行 相同的文件(模块) 中在泳池旁边。所以在一个模块 (misc) 中我们有:

**misc.py**
def testfunc(self,somevals,j):
  #some calculations
  for k in xrange(0,len(krange)):
       #some calculations
       for mn in xrange(0,len(mnrange)):
             #some more calculations
             #save results
             loc = (i*len(jrange)*len(krange)*len(mnrange))+
                   (j*len(krange)*len(mnrange))+(k*len(mnrange))+mn
             result = i*j*k*mn # example
             misfit[loc] = result

def initpool(a):
    global misfit
    misfit = a

在主文件中我们有:

**main.py**
from multiprocessing import Array, Pool
from misc import initpool, testfunc
import numpy as np

#setting up shared memory array
misfit = Array('d', np.empty((dim1,dim2,dim3,dim4)).flat)

#looping through some values
for i in xrange(0,len(irange)):
     #setting up pool
     pool = Pool(initializer=initpool,initargs=(misfit,),processes=20)
     p = [pool.apply_async(testfunc,args=(somevals,j)) for j in xrange(0,len(jrange))]
     pool.close()
     pool.join()

print(misfit[0])

注意我们最初设置Array的时候,必须和你在initpool中设置的变量一样命名,在至少从我测试它的时候开始。

这可能不是最好的方法,但它确实有效,希望其他人能找到它的用处!