python 中的 FFT 循环加速(使用 `np.einsum`)

Loop speed up of FFT in python (with `np.einsum`)

问题: 我想加快我的 python 循环,其中包含大量乘积和求和 np.einsum,但我也愿意任何其他解决方案。

我的函数采用形状为 (n,n,3) 的矢量配置 S(我的情况:n=72)并对 N*N 点的相关函数进行傅里叶变换。相关函数定义为每个向量与其他向量的乘积。这乘以向量位置的余弦函数乘以 kx 和 ky 值。每个位置 i,j 最后求和得到一个点在 k-space p,m:

def spin_spin(S,N):
    n= len(S)
    conf = np.reshape(S,(n**2,3))
    chi = np.zeros((N,N))
    kx = np.linspace(-5*np.pi/3,5*np.pi/3,N)
    ky = np.linspace(-3*np.pi/np.sqrt(3),3*np.pi/np.sqrt(3),N)

    x=np.reshape(triangular(n)[0],(n**2))
    y=np.reshape(triangular(n)[1],(n**2))
    for p in range(N):
        for m in range(N):
            for i in range(n**2):
                for j in range(n**2):        
                    chi[p,m] += 2/(n**2)*np.dot(conf[i],conf[j])*np.cos(kx[p]*(x[i]-x[j])+ ky[m]*(y[i]-y[j]))
    return(chi,kx,ky)

我的问题是我需要大约 100*100 个点,用 kx*ky 表示,并且循环需要很多小时才能完成具有 72*72 向量的格的这项工作。 计算次数:72*72*72*72*100*100 由于我的三角形网格,我无法使用 numpy 的内置 FFT,因此我需要一些其他选项来降低计算成本。

我的想法: 首先我认识到将配置重塑为向量列表而不是矩阵可以降低计算成本。此外,我使用了 numba 包,它也降低了成本,但它仍然太慢了。我发现计算这类对象的一个​​好方法是 np.einsum 函数。计算每个向量与每个向量的乘积是通过以下方式完成的:

np.einsum('ij,kj -> ik',np.reshape(S,(72**2,3)),np.reshape(S,(72**2,3)))

棘手的部分是计算 np.cos 中的项。在这里,我想计算形状列表 (100,1) 与向量位置(例如 np.shape(x)=(72**2,1))之间的乘积。特别是我真的不知道如何用np.einsum实现x方向和y方向的距离。

重现代码(你可能不需要这个):首先你需要一个矢量配置。您可以简单地使用 np.ones((72,72,3) 或以随机向量为例:

def spherical_to_cartesian(r, theta, phi):
    '''Convert spherical coordinates (physics convention) to cartesian coordinates'''
    sin_theta = np.sin(theta)
    x = r * sin_theta * np.cos(phi)
    y = r * sin_theta * np.sin(phi)
    z = r * np.cos(theta)

    return x, y, z # return a tuple

def random_directions(n, r):
    '''Return ``n`` 3-vectors in random directions with radius ``r``'''
    out = np.empty(shape=(n,3), dtype=np.float64)

    for i in range(n):
        # Pick directions randomly in solid angle
        phi = random.uniform(0, 2*np.pi)
        theta = np.arccos(random.uniform(-1, 1))
        # unpack a tuple
        x, y, z = spherical_to_cartesian(r, theta, phi)
        out[i] = x, y, z

    return out
S = np.reshape(random_directions(72**2,1),(72,72,3))

(本例中的整形需要在函数 spin_spin 中整形为 (72**2,3) 形状。)

对于矢量的位置,我使用由

定义的三角网格
def triangular(nsize):
    '''Positional arguments of the spin configuration'''

    X=np.zeros((nsize,nsize))
    Y=np.zeros((nsize,nsize))
    for i in range(nsize):
        for j in range(nsize):
            X[i,j]+=1/2*j+i
            Y[i,j]+=np.sqrt(3)/2*j
    return(X,Y)

这是加快速度的一种方法。我没有开始使用 np.einsum 因为对你的循环进行一些微调就足够了。

拖慢代码速度的主要原因是对同一事物的冗余重新计算。这里的嵌套循环是肇事者:

for p in range(N):
        for m in range(N):
            for i in range(n**2):
                for j in range(n**2):        
                    chi[p,m] += 2/(n**2)*np.dot(conf[i],conf[j])*np.cos(kx[p]*(x[i]-x[j])+ ky[m]*(y[i]-y[j]))

其中包含大量冗余,多次重新计算向量运算。

考虑 np.dot(...):这个计算完全独立于点 kx 和 ky。但只有点 kx 和 ky 需要用 m 和 n 进行索引。因此,您可以 运行 所有 i 和 j 的点积一次,并保存结果,而不是为每个 m,n 重新计算(这将是 10,000 次!)。

在类似的方法中,不需要在格子中的每个点重新计算向量之间的差异。在每个点你计算每个向量距离,当所有需要的是计算一次向量距离并且仅仅将这个结果乘以每个格点。

因此,在修复循环并使用索引为 (i,j) 的字典作为存储所有值的键之后,您可以在循环 i、j 期间查找相关值。这是我的代码:

def spin_spin(S, N):
    n = len(S)
    conf = np.reshape(S,(n**2, 3))

    chi = np.zeros((N, N))
    kx = np.linspace(-5*np.pi/3, 5*np.pi/3, N)
    ky = np.linspace(-3*np.pi/np.sqrt(3), 3*np.pi/np.sqrt(3), N)

    # Minor point; no need to use triangular twice
    x, y = triangular(n)
    x, y = np.reshape(x,(n**2)), np.reshape(y,(n**2))

    # Build a look-up for all the dot products to save calculating them many times
    dot_prods = dict()
    x_diffs, y_diffs = dict(), dict()
    for i, j in itertools.product(range(n**2), range(n**2)):
        dot_prods[(i, j)] = np.dot(conf[i], conf[j])
        x_diffs[(i, j)], y_diffs[(i, j)] = x[i] - x[j], y[i] - y[j]    

    # Minor point; improve syntax by converting nested for loops to one line
    for p, m in itertools.product(range(N), range(N)):
        for i, j in itertools.product(range(n**2), range(n**2)):
            # All vector operations are replaced by look ups to the dictionaries defined above
            chi[p, m] += 2/(n**2)*dot_prods[(i, j)]*np.cos(kx[p]*(x_diffs[(i, j)]) + ky[m]*(y_diffs[(i, j)]))
    return(chi, kx, ky)

我目前正在运行使用您提供的尺寸在一台像样的机器上进行此操作,并且 i,j 的循环在两分钟内完成。这只需要发生一次;那么它只是对 m, n 的循环。每一个都需要大约 90 秒,所以仍然需要 2-3 小时 运行 时间。我欢迎任何有关如何优化 cos 计算以加快计算速度的建议!

我实现了优化的目标,但为了提供速度感,i、j 的循环需要 2 分钟,这样就减少了 运行s 9,999 次!

优化的 Numba 实现

您代码中的主要问题是使用极 数据重复调用外部 BLAS 函数 np.dot。在这段代码中,只计算一次会更有意义,但如果您必须在循环中进行此计算,请编写一个 Numba 实现。

优化函数(暴力破解)

import numpy as np
import numba as nb

@nb.njit(fastmath=True,error_model="numpy",parallel=True)
def spin_spin(S,N):
    n= len(S)
    conf = np.reshape(S,(n**2,3))
    chi = np.zeros((N,N))
    kx = np.linspace(-5*np.pi/3,5*np.pi/3,N).astype(np.float32)
    ky = np.linspace(-3*np.pi/np.sqrt(3),3*np.pi/np.sqrt(3),N).astype(np.float32)

    x=np.reshape(triangular(n)[0],(n**2)).astype(np.float32)
    y=np.reshape(triangular(n)[1],(n**2)).astype(np.float32)

    #precalc some values
    fact=nb.float32(2/(n**2))
    conf_dot=np.dot(conf,conf.T).astype(np.float32)

    for p in nb.prange(N):
        for m in range(N):
            #accumulating on a scalar is often beneficial
            acc=nb.float32(0)
            for i in range(n**2):
                for j in range(n**2):        
                    acc+= conf_dot[i,j]*np.cos(kx[p]*(x[i]-x[j])+ ky[m]*(y[i]-y[j]))
            chi[p,m]=fact*acc

    return(chi,kx,ky)

优化函数(去除冗余计算)

做了很多多余的计算。这是有关如何删除它们的示例。这也是双精度计算的版本。

@nb.njit()
def precalc(S):
    #There may not be all redundancies removed
    n= len(S)
    conf = np.reshape(S,(n**2,3))
    conf_dot=np.dot(conf,conf.T)
    x=np.reshape(triangular(n)[0],(n**2))
    y=np.reshape(triangular(n)[1],(n**2))

    x_s=set()
    y_s=set()
    for i in range(n**2):
        for j in range(n**2):
            x_s.add((x[i]-x[j]))
            y_s.add((y[i]-y[j]))

    x_arr=np.sort(np.array(list(x_s)))
    y_arr=np.sort(np.array(list(y_s)))


    conf_dot_sel=np.zeros((x_arr.shape[0],y_arr.shape[0]))
    for i in range(n**2):
        for j in range(n**2):
            ii=np.searchsorted(x_arr,x[i]-x[j])
            jj=np.searchsorted(y_arr,y[i]-y[j])
            conf_dot_sel[ii,jj]+=conf_dot[i,j]

    return x_arr,y_arr,conf_dot_sel

@nb.njit(fastmath=True,error_model="numpy",parallel=True)
def spin_spin_opt_2(S,N):
    chi = np.empty((N,N))
    n= len(S)

    kx = np.linspace(-5*np.pi/3,5*np.pi/3,N)
    ky = np.linspace(-3*np.pi/np.sqrt(3),3*np.pi/np.sqrt(3),N)

    x_arr,y_arr,conf_dot_sel=precalc(S)
    fact=2/(n**2)
    for p in nb.prange(N):
        for m in range(N):
            acc=nb.float32(0)
            for i in range(x_arr.shape[0]):
                for j in range(y_arr.shape[0]):        
                    acc+= fact*conf_dot_sel[i,j]*np.cos(kx[p]*x_arr[i]+ ky[m]*y_arr[j])
            chi[p,m]=acc

    return(chi,kx,ky)

@nb.njit()
def precalc(S):
    #There may not be all redundancies removed
    n= len(S)
    conf = np.reshape(S,(n**2,3))
    conf_dot=np.dot(conf,conf.T)
    x=np.reshape(triangular(n)[0],(n**2))
    y=np.reshape(triangular(n)[1],(n**2))

    x_s=set()
    y_s=set()
    for i in range(n**2):
        for j in range(n**2):
            x_s.add((x[i]-x[j]))
            y_s.add((y[i]-y[j]))

    x_arr=np.sort(np.array(list(x_s)))
    y_arr=np.sort(np.array(list(y_s)))


    conf_dot_sel=np.zeros((x_arr.shape[0],y_arr.shape[0]))
    for i in range(n**2):
        for j in range(n**2):
            ii=np.searchsorted(x_arr,x[i]-x[j])
            jj=np.searchsorted(y_arr,y[i]-y[j])
            conf_dot_sel[ii,jj]+=conf_dot[i,j]

    return x_arr,y_arr,conf_dot_sel

@nb.njit(fastmath=True,error_model="numpy",parallel=True)
def spin_spin_opt_2(S,N):
    chi = np.empty((N,N))
    n= len(S)

    kx = np.linspace(-5*np.pi/3,5*np.pi/3,N)
    ky = np.linspace(-3*np.pi/np.sqrt(3),3*np.pi/np.sqrt(3),N)

    x_arr,y_arr,conf_dot_sel=precalc(S)
    fact=2/(n**2)
    for p in nb.prange(N):
        for m in range(N):
            acc=nb.float32(0)
            for i in range(x_arr.shape[0]):
                for j in range(y_arr.shape[0]):        
                    acc+= fact*conf_dot_sel[i,j]*np.cos(kx[p]*x_arr[i]+ ky[m]*y_arr[j])
            chi[p,m]=acc

    return(chi,kx,ky)

计时

#brute-force
%timeit res=spin_spin(S,100)
#48 s ± 671 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

#new version
%timeit res_2=spin_spin_opt_2(S,100)
#5.33 s ± 59.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit res_2=spin_spin_opt_2(S,1000)
#1min 23s ± 2.43 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

编辑(SVML 检查)

import numba as nb
import numpy as np

@nb.njit(fastmath=True)
def foo(n):
    x   = np.empty(n*8, dtype=np.float64)
    ret = np.empty_like(x)
    for i in range(ret.size):
            ret[i] += np.cos(x[i])
    return ret

foo(1000)

if 'intel_svmlcc' in foo.inspect_llvm(foo.signatures[0]):
    print("found")
else:
    print("not found")

#found

如果有 not found 阅读 this link. 它应该可以在 Linux 和 Windows 上运行,但我还没有在 macOS 上测试它。