高效填充二维 SciPy 稀疏矩阵

Efficiently populating a 2D SciPy sparse matrix

我想在 python 中填充一个巨大的稀疏矩阵,目的是在二维中实现 Crank-Nicolson 数值方法。

我是通过使用两个嵌套的 for 循环遍历所有内部节点来完成的,但是速度非常慢。 因为它是带有矩阵的嵌套 for 循环,所以我想到了 Numba,但它不适用于稀疏矩阵。 由于内存问题,我无法在将矩阵作为参数传递给 numba-jitted 函数之前将其转换为密集格式。

我想问一下我应该寻找什么才能使函数填充下面的A矩阵更快?

我尝试使用 scipy.sparse.diags,但我又一次使用了两个嵌套的 for 循环,就像下面的(原始)代码一样。

问题是 k 值是从 ij 计算出来的,我不知道如何在没有双 for 循环的情况下操作它。

用双 for 循环填充 A 矩阵的代码是:

# @njit
def populate_A_matrix_b_vector(psi_gr, timeste, A, b, sys_float_info_min):

    # Fill all the interior nodes
    # -------------------------------------

    # Utilities:
    two_lambd = 2 * gl.lambd
    oneover4_times_1over_deltazsq = 0.25 / (gl.delta_z**2)
    oneover2_times_1over_deltazsq = 0.5 / (gl.delta_z**2)

    # For the interior nodes:
    for i in range(1, gl.N_z_divs):
        for j in range(1, gl.N_epsilon_divs):

            k = j*(gl.N_z_divs+1) + i
            
            ratio = 1 / ( (2*gl.lambd**2) * (j*gl.delta_epsilon)**two_lambd )

            A[k, k] = (-1/gl.delta_time) - ratio * (j**2 - (gl.lambd-0.5)**2 * 0.5) - oneover2_times_1over_deltazsq - \
                           0.5 * Voft_OFF_imag(i, j, gl.m)
                           
            A[k, k+1] = oneover4_times_1over_deltazsq
            A[k, k-1] = oneover4_times_1over_deltazsq
            A[k, k + (gl.N_z_divs+1)] = ratio * ( j**2*0.5 - (gl.lambd-1)*j*0.5 )
            A[k, k - (gl.N_z_divs+1)] = ratio * ( j**2*0.5 + (gl.lambd-1)*j*0.5 )

            b[k] = psi_gr[i,  j, timeste] * ( (-1/gl.delta_time) + ratio*j**2 - (gl.lambd-0.5)**2*0.5 + oneover2_times_1over_deltazsq + \
                                0.5 * Voft_OFF_imag(i, j, gl.m)  ) + \
                   psi_gr[i+1, j, timeste] * (-oneover4_times_1over_deltazsq) + \
                   psi_gr[i-1, j, timeste] * (-oneover4_times_1over_deltazsq) + \
                   psi_gr[i,   j+1, timeste] * ( -ratio * (j**2 * 0.5 - (gl.lambd-1) * j * 0.5)   ) + \
                   psi_gr[i,   j-1, timeste] * ( -ratio * (j**2 * 0.5 + (gl.lambd-1) * j * 0.5)   )
            
        if (i % 50 == 0):
            print("we have done iter" + str(i*gl.N_epsilon_divs + j) + "out of" + str(gl.N_z_divs*gl.N_epsilon_divs) + "iters")

    # Fill boundaries
    # -------------------------

    # Bottom boundary except corners: j = 0; i = 1, 2, 3, ..., gl.N_z_divs - 1
    j = 0
    for i in range(1, gl.N_z_divs):
        k = j*(gl.N_z_divs+1) + i
        A[k, k] = 1

        if ( np.real(psi_gr[i, 2, timeste]) >= sys_float_info_min and np.imag(psi_gr[i, 2, timeste]) >= sys_float_info_min):
            alpha = psi_gr[i, 1, timeste] / psi_gr[i, 2, timeste]
            if (np.arctan(np.imag(alpha) / np.real(alpha))) < 0:
                alpha = np.abs(alpha)
        else:
            alpha = 0.0

        # A[k, k+(gl.N_z_divs+1)] = alpha
        A[k, k+(gl.N_z_divs+1)] = 0.0
    
    # Top boundary except corners: j = gl.N_epsilon_divs; i = 1, 2, 3, ..., gl.N_z_divs - 1
    j = gl.N_epsilon_divs
    for i in range(1, gl.N_z_divs):
        k = j*(gl.N_z_divs+1) + i
        A[k, k] = 1

        if (np.real(psi_gr[i, gl.N_epsilon_divs-2, timeste])>= sys_float_info_min and np.imag(psi_gr[i, gl.N_epsilon_divs-2, timeste]) >= sys_float_info_min):
            alpha = psi_gr[i, gl.N_epsilon_divs-1, timeste] / psi_gr[i, gl.N_epsilon_divs-2, timeste]
            if (np.arctan(np.imag(alpha) / np.real(alpha))) < 0:
                alpha = np.abs(alpha)
        else:
            alpha = 0.0

        # A[k, k-(gl.N_z_divs+1)] = alpha
        A[k, k-(gl.N_z_divs+1)] = 0.0

    # Left boundary except corners: i = 0; j = 1, 2, 3, ..., gl.N_epsilon_divs - 1
    i = 0
    for j in range(1, gl.N_epsilon_divs):
        k = j*(gl.N_z_divs+1) + i
        A[k, k] = 1

        if (np.real(psi_gr[2, j, timeste]) >= sys_float_info_min and np.imag(psi_gr[2, j, timeste]) >= sys_float_info_min):
            alpha = psi_gr[1, j, timeste] / psi_gr[2, j, timeste]
            if (np.arctan(np.imag(alpha) / np.real(alpha))) < 0:
                alpha = np.abs(alpha)
        else:
            alpha = 0.0

       # A[k, k+1] = alpha
        A[k, k+1] = 0.0
    
    # Right boundary except corners: i = N_z_divs; j = 1, 2, 3, ..., gl.N_epsilon_divs - 1
    i = gl.N_z_divs
    for j in range(1, gl.N_epsilon_divs):
        k = j*(gl.N_z_divs+1) + i
        A[k, k] = 1

        if (np.real(psi_gr[gl.N_z_divs-2, j, timeste]) >= sys_float_info_min and np.imag(psi_gr[gl.N_z_divs-2, j, timeste]) >= sys_float_info_min):
            alpha = psi_gr[gl.N_z_divs-1, j, timeste] / psi_gr[gl.N_z_divs-2, j, timeste]
            if (np.arctan(np.imag(alpha) / np.real(alpha))) < 0:
                alpha = np.abs(alpha)
        else:
            alpha = 0.0
        
        A[k, k-1] = alpha
        # A[k, k-1] = 0.0


    # Bottom left corner: i = j = 0
    i = 0;  j = 0
    k = j*(gl.N_z_divs+1) + i
    A[k, k] = 1

    if (np.real(psi_gr[2, 0, timeste]) >= sys_float_info_min and np.imag(psi_gr[2, 0, timeste]) >= sys_float_info_min):
        alpha1 = psi_gr[1, 0, timeste] / psi_gr[2, 0, timeste]
        if (np.arctan(np.imag(alpha1) / np.real(alpha1))) < 0:
            alpha1 = np.abs(alpha1)
    else:
        alpha1 = 0.0
    
    if (np.real(psi_gr[0, 2, timeste]) >= sys_float_info_min and np.imag(psi_gr[0, 2, timeste]) >= sys_float_info_min):
        alpha2 = psi_gr[0, 1, timeste] / psi_gr[0, 2, timeste]
        if (np.arctan(np.imag(alpha2) / np.real(alpha2))) < 0:
            alpha2 = np.abs(alpha2)
    else:
        alpha2 = 0.0

    # A[k, k+1]                 = - 0.5 * alpha1
    # A[k, k + (gl.N_z_divs+1)] = - 0.5 * alpha2
    A[k, k+1] = 0.0
    A[k, k + (gl.N_z_divs+1)] = 0.0


    # Bottom right corner: i = N_z_divs; j = 0
    i = gl.N_z_divs;  j = 0
    k = j*(gl.N_z_divs+1) + i
    A[k, k] = 1

    if (np.real(psi_gr[gl.N_z_divs-2, 0, timeste]) >= sys_float_info_min and np.imag(psi_gr[gl.N_z_divs-2, 0, timeste]) >= sys_float_info_min):
        alpha1 = psi_gr[gl.N_z_divs-1, 0, timeste] / psi_gr[gl.N_z_divs-2, 0, timeste]
        if (np.arctan(np.imag(alpha1) / np.real(alpha1))) < 0:
            alpha1 = np.abs(alpha1)
    else:
        alpha1 = 0.0
    
    if (np.real(psi_gr[gl.N_z_divs, 2, timeste]) >= sys_float_info_min and np.imag(psi_gr[gl.N_z_divs, 2, timeste]) >= sys_float_info_min):
        alpha2 = psi_gr[gl.N_z_divs, 1, timeste] / psi_gr[gl.N_z_divs, 2, timeste]
        if (np.arctan(np.imag(alpha2) / np.real(alpha2))) < 0:
            alpha2 = np.abs(alpha2)
    else:
        alpha2 = 0.0

    # A[k, k-1] = - 0.5 * alpha1
    # A[k, k + (gl.N_z_divs+1)] = - 0.5 * alpha2
    A[k, k-1] = 0.0
    A[k, k + (gl.N_z_divs+1)] = 0.0


    # Top left corner: i = 0; j = N_epsilon_divs
    i = 0;  j = gl.N_epsilon_divs
    k = j*(gl.N_z_divs+1) + i
    A[k, k] = 1

    if (np.real(psi_gr[2, gl.N_epsilon_divs, timeste]) >= sys_float_info_min and np.imag(psi_gr[2, gl.N_epsilon_divs, timeste]) >= sys_float_info_min):
        alpha1 = psi_gr[1, gl.N_epsilon_divs, timeste] / psi_gr[2, gl.N_epsilon_divs, timeste]
        if (np.arctan(np.imag(alpha1) / np.real(alpha1))) < 0:
            alpha1  = np.abs(alpha1)
    else:
        alpha1 = 0.0
    
    if (np.real(psi_gr[0, gl.N_epsilon_divs-2, timeste]) >= sys_float_info_min and np.imag(psi_gr[0, gl.N_epsilon_divs-2, timeste]) >= sys_float_info_min):
        alpha2 = psi_gr[0, gl.N_epsilon_divs-1, timeste] / psi_gr[0, gl.N_epsilon_divs-2, timeste]
        if (np.arctan(np.imag(alpha2) / np.real(alpha2))) < 0:
            alpha2 = np.abs(alpha2)
    else:
        alpha2 = 0.0

    # A[k, k+1] = - 0.5 * alpha1
    # A[k, k - (gl.N_z_divs+1)] = - 0.5 * alpha2
    A[k, k+1] = 0.0
    A[k, k - (gl.N_z_divs+1)] = 0.0

    # Top right corner: i = N_z_divs; j = N_epsilon_divs
    i = gl.N_z_divs;  j = gl.N_epsilon_divs 
    k = j*(gl.N_z_divs+1) + i
    A[k, k] = 1

    if (np.real(psi_gr[gl.N_z_divs-2, gl.N_epsilon_divs, timeste]) >= sys_float_info_min and np.imag(psi_gr[gl.N_z_divs-2, gl.N_epsilon_divs, timeste]) >= sys_float_info_min):
        alpha1 = psi_gr[gl.N_z_divs-1, gl.N_epsilon_divs, timeste] / psi_gr[gl.N_z_divs-2, gl.N_epsilon_divs, timeste]
        if (np.arctan(np.imag(alpha1) / np.real(alpha1))) < 0:
            alpha1 = np.abs(alpha1)
    else:
        alpha1 = 0.0
    
    if (np.real(psi_gr[gl.N_z_divs, gl.N_epsilon_divs-2, timeste]) >= sys_float_info_min and np.imag(psi_gr[gl.N_z_divs, gl.N_epsilon_divs-2, timeste]) >= sys_float_info_min):
        alpha2 = psi_gr[gl.N_z_divs, gl.N_epsilon_divs-1, timeste] / psi_gr[gl.N_z_divs, gl.N_epsilon_divs-2, timeste]
        if (np.arctan(np.imag(alpha2) / np.real(alpha2))) < 0:
            alpha2 = np.abs(alpha2)
    else:
        alpha2 = 0.0

    # A[k, k-1] = - 0.5 * alpha1
    # A[k, k -(gl.N_z_divs+1)] = - 0.5 * alpha2
    A[k, k-1] = 0.0
    A[k, k -(gl.N_z_divs+1)] = 0.0

    return A, b

@njit
def Voft_OFF_imag(i, j, m):
    # this is the same as Voft_OFF() above, because there is no modification due to the imag. time propagation (delta_time --> -1j*delta_time) appearing in Voft_OFF (as there is no E-field turned on)
    ii = i - gl.K/2
    term1  = -1 / sqrt(  (j*gl.delta_epsilon)**(2*gl.lambd) + (ii*gl.delta_z)**2  ) #  - 1 / sqrt(...)
    term2 = m**2 / ( 2*(j*gl.delta_epsilon)**(2*gl.lambd) )
    return (term1 + term2)

我将其用作:

def solve_for_1timestep_imag(psi_gr, timeste):

    size = (gl.N_z_divs+1) * (gl.N_epsilon_divs+1)

    A = sparse.dok_matrix( (size, size), dtype=np.complex64)
    # b = sparse.dok_array((size, 1), dtype=np.complex64)
    b = np.zeros( (size, 1), dtype=np.complex64)

    sys_float_info_min = sys.float_info.min
    A, b = populate_A_matrix_b_vector(psi_gr, timeste, A, b, sys_float_info_min)
    ...
    ...

gl.py 文件是:

import numpy as np

Energy_1s_au = np.float64(-0.5) # -13.6 eV in atomic units of energy
m = np.float64(0.0) # magnetic Q number

omega = np.float64(0.2) 
E0 = np.float64(0.3)

N_timesteps_imag = np.int32(120)

lambd = np.float64(1.5)

delta_time = np.float64(0.01)

N_epsilon_divs = np.int32(200)
N_z_divs = np.int32(500)
K = N_z_divs # equal to the variable N_z_divs

delta_epsilon = np.float64(0.1)
delta_z = np.float64(0.1)

z_max = (N_z_divs/2) * delta_z
epsilon_range = np.linspace(0.0, N_epsilon_divs*delta_epsilon, N_epsilon_divs+1)
z_range = np.linspace(-z_max, z_max, N_z_divs+1)

填充 A 矩阵的函数是我代码的瓶颈,如果能帮助我至少提高一点速度,我将不胜感激。

谢谢!

Scipy 稀疏矩阵非常慢。对于内部使用低效 hash-tables 的 DOK 矩阵尤其如此。但是,reading/Setting 循环中稀疏矩阵的特定单元非常慢(例如,在我的机器上每次访问需要 10~15 us)。你应该像避免瘟疫一样避免这种情况。

解决此问题的一个方法是创建一个索引和值数组,并以矢量化 方式将值写入space 矩阵。 indices/values 的计算可以使用 Numba 进行优化。这是一个例子:

@nb.njit
def gen_indexed_values(psi_gr, timeste, b):
    two_lambd = 2 * gl.lambd
    oneover4_times_1over_deltazsq = 0.25 / (gl.delta_z**2)
    oneover2_times_1over_deltazsq = 0.5 / (gl.delta_z**2)

    res_size = (gl.N_z_divs-1) * (gl.N_epsilon_divs-1)
    A_idx = np.empty(res_size, dtype=np.int32)
    A_values = np.empty((3, res_size), dtype=np.complex64)
    cur = 0

    for i in range(1, gl.N_z_divs):
        for j in range(1, gl.N_epsilon_divs):
            k = j*(gl.N_z_divs+1) + i
            ratio = 1 / ( (2*gl.lambd**2) * (j*gl.delta_epsilon)**two_lambd )

            A_idx[cur] = k
            A_values[0, cur] = (-1/gl.delta_time) - ratio * (j**2 - (gl.lambd-0.5)**2 * 0.5) - oneover2_times_1over_deltazsq - \
                                0.5 * Voft_OFF_imag(i, j, gl.m)
            A_values[1, cur] = ratio * ( j**2*0.5 - (gl.lambd-1)*j*0.5 )
            A_values[2, cur] = ratio * ( j**2*0.5 + (gl.lambd-1)*j*0.5 )

            b[k] = psi_gr[i,  j, timeste] * ( (-1/gl.delta_time) + ratio*j**2 - (gl.lambd-0.5)**2*0.5 + oneover2_times_1over_deltazsq + \
                                0.5 * Voft_OFF_imag(i, j, gl.m)  ) + \
                   psi_gr[i+1, j, timeste] * (-oneover4_times_1over_deltazsq) + \
                   psi_gr[i-1, j, timeste] * (-oneover4_times_1over_deltazsq) + \
                   psi_gr[i,   j+1, timeste] * ( -ratio * (j**2 * 0.5 - (gl.lambd-1) * j * 0.5)   ) + \
                   psi_gr[i,   j-1, timeste] * ( -ratio * (j**2 * 0.5 + (gl.lambd-1) * j * 0.5)   )

            cur += 1

    return A_idx, A_values

def populate_A_matrix_b_vector(psi_gr, timeste, A, b, sys_float_info_min):
    # Utilities:
    two_lambd = 2 * gl.lambd
    oneover4_times_1over_deltazsq = 0.25 / (gl.delta_z**2)
    oneover2_times_1over_deltazsq = 0.5 / (gl.delta_z**2)

    # For the interior nodes:
    A_idx, A_values = gen_indexed_values(psi_gr, timeste, b)
    A[A_idx, A_idx] = A_values[0,:]
    A[A_idx, A_idx+1] = oneover4_times_1over_deltazsq
    A[A_idx, A_idx-1] = oneover4_times_1over_deltazsq
    A[A_idx, A_idx + (gl.N_z_divs+1)] = A_values[1,:]
    A[A_idx, A_idx - (gl.N_z_divs+1)] = A_values[2,:]

    # [...]

这比我的机器快 22 倍。使用另一种稀疏矩阵可能有助于提高性能。