Python - 尝试在 class 内进行并行计算

Python - attempt at parallelising calculation inside class

我有以下工作代码:

from itertools import product
import time
import numpy as np

class Basis_NonI(object):
    ''' Non Interfering case'''

    def __init__(self, dimension, generation, embed_dimension = None):
        self.dimension = dimension
        self.generation = generation

    def KineticEnergy(self, vec):
        ''' Returns kinetic energy of a basis vector, according to formual 4*|M·b|^2'''
        return 4*np.abs(vec**2)

    def make_basis(self):
        t = time.time()
        x = np.arange(-self.generation, self.generation)  
        array = np.array([np.array(i) for i in product(x, repeat = self.dimension)]) #cube of basis states
        gens = np.abs(array).sum(axis=1)
        mask = np.where(gens<=self.generation)[0] #the product creates all elemnts up to maxg, but also some at higher maxg (maybe it's fine?)
        array = array[mask]
        gens = gens[mask]

        kins = np.array(map(self.KineticEnergy, array))
        uniqueK = np.unique(kins)

        group_gens = []
        kinenergy_gens = []

        for i in np.arange(self.generation):
            pos = np.where(gens==i)[0]
            group_gens.append(map(np.array, array[pos]))
            kinenergy_gens.append(map(np.array, kins[pos]))

        group_kin = []
        generation_kin = []

        for j in uniqueK:
            posK = np.where(kins==j)[0]
            group_kin.append(map(np.array, array[posK]))
            generation_kin.append(gens[posK])

        print('Basis took {} s'.format(time.time()-t))

        return group_gens, kinenergy_gens, group_kin, generation_kin, uniqueK

if __name__ == '__main__':

    b = Basis_NonI(1, 5)
    a = b.make_basis()

我正在尝试将其中的一些计算并行化以使其运行得更快(这样我就可以增加所涉及的向量的大小)。

make_basis 有两个 for 循环,它们填充两个列表。所以我的第一个想法是通过生成两个进程来并行化这两个进程:

from itertools import product
import time
import numpy as np
from functools import partial
from multiprocessing import Process

class Basis_NonI(object):
''' Non Interfering case'''

def __init__(self, dimension, generation, embed_dimension = None):
    self.dimension = dimension
    self.generation = generation

def KineticEnergy(self, vec):
    return 4*np.abs(vec**2)

def generation_basis(self, group_gens, kinenergy_gens, gens, kins, array):
    for i in np.arange(self.generation):
        pos = np.where(gens==i)[0]
        group_gens.append(map(np.array, array[pos]))
        kinenergy_gens.append(map(np.array, kins[pos]))
    return group_gens, kinenergy_gens

def kinetic_basis(self, uniqueK, kins, group_kin, generation_kin, gens, array):
    for j in uniqueK:
        posK = np.where(kins==j)[0]
        group_kin.append(map(np.array, array[posK]))
        generation_kin.append(gens[posK])
    return group_kin, generation_kin

def run(self):
    t = time.time()
    x = np.arange(-self.generation, self.generation)  
    array = np.array([np.array(i) for i in product(x, repeat = self.dimension)]) #cube of basis states
    gens = np.abs(array).sum(axis=1)
    mask = np.where(gens<=self.generation)[0] 
    array = array[mask]
    gens = gens[mask]
    kins = np.array(map(self.KineticEnergy, array))
    uniqueK = np.unique(kins)

    group_gens = []
    kinenergy_gens = []
    group_kin = []
    generation_kin = []

    func1 = partial(self.generation_basis, group_gens, kinenergy_gens, gens, kins, array)        
    func2 = partial(self.kinetic_basis, uniqueK, kins, group_kin, generation_kin, gens, array)

    p1 = Process(target = func1)
    p2 = Process(target = func2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print('Basis took {} s'.format(time.time()-t))
    return group_gens, kinenergy_gens, group_kin, generation_kin, uniqueK

if __name__ == '__main__':

    b = Basis_NonI(1, 5)
    a = b.run()

但是当我尝试访问 a 时,它是一堆空列表,因此实际上并没有填充它们。

我做错了什么?

您需要意识到,每个新进程都会获得 copy 内存 space(堆、堆栈等)。也就是说,您需要使用某种 IPC(进程间通信)传递计算结果,例如 Queue, Pipe or Shared Memory.

您可以使用 Pool and apply or map 来简化它,它将为您执行 IPC,如取自文档的示例所示:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))