优化 NumPy 中的部分进行的矢量化操作

optimizing vectorized operations made by sections in NumPy

长话短说,我需要使用矩阵本身的值对 2D 矩阵进行向量运算,进行数千次迭代,但由于我在下面解释的原因,我需要在多个部分进行,我想要通过仍然获得最佳性能和可读性来了解最好的方法。

我正在求解拉普拉斯方程,以便为计算空气动力学模拟生成网格。

为此,假设我有一个名为 X 的二维矩阵,形状为 (M, N),其中 M 和 N 分别是行数和列数,我需要获取每个矩阵的值"coordinates" (i, j) 的内部节点,受到其邻居的影响,指向 (i+1, j) (i-1, j) (i, j+1) (i, j-1)。以下一个等式为例:

X[i, j] = (X[i+1, j] - X[i-1, j] + X[i, j+1] - X[i, j-1]) / 4

代码运行数次迭代,以数十万次为单位,在每次迭代中我需要遍历整个矩阵计算每个内部节点。上面的等式说明 计算是在矩阵本身中进行的,并且 X[i-1, j]X[i, j-1] 的值是当前迭代中已经计算的值。

所以,这就是所讨论问题的背景,现在是我正在编写的代码。作为一个新手,我开始使用两个嵌套循环的明显但不是最佳的方法,一个用于行,一个用于列,它们已经在 while 循环(迭代次数)内:

while current_it < it_max:
    for i in range(1, M-1):
        for j in range(1, N-1):
            X[i, j] = (X[i+1, j] - X[i-1, j] + X[i, j+1] - X[i, j-1]) / 4

这很有效,对于较小的矩阵,它的执行时间相对较短,大约 5 分钟,我知道执行时间已经很长了,但这并不是真正的问题。但我需要大网格,例如大小为 1200 x 400 的网格,在这种情况下,执行时间呈指数级增长,需要 DAYS 才能求解网格,这不再是负担得起的。

感谢 this question,我意识到我可以向量化方程并摆脱嵌套的 for 循环,所以现在我的代码看起来像

while current_it < it_max:
    # replacements of i and j
    #  j or  i      -->   1:-1
    # (j or  i) + 1 -->   2:
    # (j or  i) - 1 -->   :-2
    X[1:-1, 1:-1] = (X[2:, 1:-1] - X[:-2, 1:-1] + X[1:-1, 2:] - X[1:-1, :-2]) / 4

这代表了执行时间的巨大改进,一个在经典方法中需要 3 天才能生成的网格现在可能只需要 5 分钟。

我现在遇到的问题是我失去了为当前迭代获取 (i-1)(j-1) 值的能力,这使代码执行了我怀疑的更多迭代需要。

我的解决方案是将矩阵分成几部分,一次计算每一部分。

while current_it < it_max:
    # 1st piece [i, 1 : lim_1]
    # 2nd piece [i, lim_1 :]
    X[1:-1, 1:lim_1] = (X[2:, 1:lim_1] - X[:-2, 1:lim_1] \
            + X[1:-1, 2:lim_1 + 1] - X[1:-1, :lim_1 - 1]) / 4
    X[1:-1, lim_1:-1] = (X[2:, lim_1:-1] - X[:-2, lim_1:-1] \
            + X[1:-1, lim_1 + 1:] - X[1:-1, lim_1 - 1:-2]) / 4

但我知道复制粘贴是不好的做法,而且代码行数增长得很快,因为我在 ij 两个方向都需要多个部分。

重新安排最终代码以获得最佳性能和可读性的最佳方式是什么。

这是一种可以真正受益于使用 numba 的问题。对于下面的设置,在不牺牲可读性的情况下,我获得的速度几乎是 numpy 解决方案的两倍。

import numpy as np
from numba import jit

X = np.random.randn(100, 100)
it_max = 1000

@jit
def loopy(X):
  N, M = X.shape
  for itr in range(it_max):
    for i in range(1, M-1):
      for j in range(1, N-1):
        X[i, j] = (X[i+1, j] - X[i-1, j] + X[i, j+1] - X[i, j-1]) / 4
  return X


def vectory(X):
  for itr in range(it_max):
    # replacements of i and j
    #  j or  i      -->   1:-1
    # (j or  i) + 1 -->   2:
    # (j or  i) - 1 -->   :-2
    X[1:-1, 1:-1] = (X[2:, 1:-1] - X[:-2, 1:-1] + X[1:-1, 2:] - X[1:-1, :-2]) / 4
  return X


Xc = X.copy()
%timeit loopy(Xc)   # 10 loops, best of 3: 25.1 ms per loop
Xc = X.copy()
%timeit vectory(Xc) # 10 loops, best of 3: 43.1 ms per loop