在多处理中使用共享数组来保存值
Using a Shared Array in multiprocessing to save values
我正在尝试让 python 多处理工作来加速我编写的代码。代码如下所示:
from multiprocessing import Array, Pool
import numpy as np
#setting up shared memory array
global misfit
misfit = Array('d', np.empty((dim1,dim2,dim3,dim4)).flat)
#looping through some values
for i in xrange(0,1):
#setting up pool
pool = Pool()
p = [pool.apply_async(self.testfunc,args=(somevals,j)) for j in xrange(0,1)]
pool.close()
pool.join()
其中 self.testfunc 看起来像:
def testfunc(self,somevals,j):
#some calculations
for k in xrange(0,1):
#some calculations
for mn in xrange(0,1):
#some more calculations
#save results
result = i*j*k*mn # example
misfit[i*j*k*mn] = result
我的问题是,当我 运行 这个 none 的值保存在共享数组中时,它仍然是空的。我知道这可能与全局变量有关,但在使用此确切设置的更简单的程序中,值将保存到数组中。该数组在完整程序中也非常大(4561920000 个值)。此外,如果我在池外调用此函数,它会工作并且值会被保存。
所以我的问题是我在这里做错了什么?我是否错误地发送了共享数组?
编辑:我想我会添加有效的代码:
from multiprocessing import Array, Pool
from numpy import empty, sin
from time import time
import numpy as np
def initarr():
a = Array('d', empty((5, 50, 80)).flat)
return a
def testfunc(i, j, k):
count = (i*50*80) + (j*80) + k
x = sin(k)
a[count] = x
y = np.fft.fft(np.exp(2j*np.pi*np.arange(50000)/50000))
def process(i):
start = time()
pool = Pool()
for j in xrange(0, 50):
p = [pool.apply_async(testfunc, args=(i, j, k)) for k in xrange(0, 80)]
pool.close()
pool.join()
print time() - start
global a
a = initarr()
for i in xrange(0, 5):
process(i)
好的,在我们 IT 部门人员的帮助下,我终于有了一个可用的版本,所以对于将来看到这个问题的任何人,我都会 post 一个解决方案。我还没有真正使用过堆栈溢出,如果回答我自己的问题是不礼貌的,我很抱歉。
我们使用初始化函数实现了此功能,但我们必须确保初始化函数位于 与 运行 相同的文件(模块) 中在泳池旁边。所以在一个模块 (misc) 中我们有:
**misc.py**
def testfunc(self,somevals,j):
#some calculations
for k in xrange(0,len(krange)):
#some calculations
for mn in xrange(0,len(mnrange)):
#some more calculations
#save results
loc = (i*len(jrange)*len(krange)*len(mnrange))+
(j*len(krange)*len(mnrange))+(k*len(mnrange))+mn
result = i*j*k*mn # example
misfit[loc] = result
def initpool(a):
global misfit
misfit = a
在主文件中我们有:
**main.py**
from multiprocessing import Array, Pool
from misc import initpool, testfunc
import numpy as np
#setting up shared memory array
misfit = Array('d', np.empty((dim1,dim2,dim3,dim4)).flat)
#looping through some values
for i in xrange(0,len(irange)):
#setting up pool
pool = Pool(initializer=initpool,initargs=(misfit,),processes=20)
p = [pool.apply_async(testfunc,args=(somevals,j)) for j in xrange(0,len(jrange))]
pool.close()
pool.join()
print(misfit[0])
注意我们最初设置Array的时候,必须和你在initpool中设置的变量一样命名,在至少从我测试它的时候开始。
这可能不是最好的方法,但它确实有效,希望其他人能找到它的用处!
我正在尝试让 python 多处理工作来加速我编写的代码。代码如下所示:
from multiprocessing import Array, Pool
import numpy as np
#setting up shared memory array
global misfit
misfit = Array('d', np.empty((dim1,dim2,dim3,dim4)).flat)
#looping through some values
for i in xrange(0,1):
#setting up pool
pool = Pool()
p = [pool.apply_async(self.testfunc,args=(somevals,j)) for j in xrange(0,1)]
pool.close()
pool.join()
其中 self.testfunc 看起来像:
def testfunc(self,somevals,j):
#some calculations
for k in xrange(0,1):
#some calculations
for mn in xrange(0,1):
#some more calculations
#save results
result = i*j*k*mn # example
misfit[i*j*k*mn] = result
我的问题是,当我 运行 这个 none 的值保存在共享数组中时,它仍然是空的。我知道这可能与全局变量有关,但在使用此确切设置的更简单的程序中,值将保存到数组中。该数组在完整程序中也非常大(4561920000 个值)。此外,如果我在池外调用此函数,它会工作并且值会被保存。
所以我的问题是我在这里做错了什么?我是否错误地发送了共享数组?
编辑:我想我会添加有效的代码:
from multiprocessing import Array, Pool
from numpy import empty, sin
from time import time
import numpy as np
def initarr():
a = Array('d', empty((5, 50, 80)).flat)
return a
def testfunc(i, j, k):
count = (i*50*80) + (j*80) + k
x = sin(k)
a[count] = x
y = np.fft.fft(np.exp(2j*np.pi*np.arange(50000)/50000))
def process(i):
start = time()
pool = Pool()
for j in xrange(0, 50):
p = [pool.apply_async(testfunc, args=(i, j, k)) for k in xrange(0, 80)]
pool.close()
pool.join()
print time() - start
global a
a = initarr()
for i in xrange(0, 5):
process(i)
好的,在我们 IT 部门人员的帮助下,我终于有了一个可用的版本,所以对于将来看到这个问题的任何人,我都会 post 一个解决方案。我还没有真正使用过堆栈溢出,如果回答我自己的问题是不礼貌的,我很抱歉。
我们使用初始化函数实现了此功能,但我们必须确保初始化函数位于 与 运行 相同的文件(模块) 中在泳池旁边。所以在一个模块 (misc) 中我们有:
**misc.py**
def testfunc(self,somevals,j):
#some calculations
for k in xrange(0,len(krange)):
#some calculations
for mn in xrange(0,len(mnrange)):
#some more calculations
#save results
loc = (i*len(jrange)*len(krange)*len(mnrange))+
(j*len(krange)*len(mnrange))+(k*len(mnrange))+mn
result = i*j*k*mn # example
misfit[loc] = result
def initpool(a):
global misfit
misfit = a
在主文件中我们有:
**main.py**
from multiprocessing import Array, Pool
from misc import initpool, testfunc
import numpy as np
#setting up shared memory array
misfit = Array('d', np.empty((dim1,dim2,dim3,dim4)).flat)
#looping through some values
for i in xrange(0,len(irange)):
#setting up pool
pool = Pool(initializer=initpool,initargs=(misfit,),processes=20)
p = [pool.apply_async(testfunc,args=(somevals,j)) for j in xrange(0,len(jrange))]
pool.close()
pool.join()
print(misfit[0])
注意我们最初设置Array的时候,必须和你在initpool中设置的变量一样命名,在至少从我测试它的时候开始。
这可能不是最好的方法,但它确实有效,希望其他人能找到它的用处!