将函数应用于没有循环的多维 numpy 数组

Applying functions to multidimensional numpy arrays without loops

我正在使用 numpy 处理栅格数据(从 GDAL 读取后),它表示高程。我的目标是使用 numpy 计算阵列中每个像素的水流方向,主要根据给定像素与其 8 个相邻像素之间的海拔差异来确定。

我已经实现了滚动 window 技术来生成每个像素及其邻居的多维数组,其工作原理如下:

def rolling_window(array, window_size):
    itemsize = array.itemsize
    shape = (array.shape[0] - window_size + 1,
             array.shape[1] - window_size + 1,
             window_size, window_size)
    strides = (array.shape[1] * itemsize, itemsize,
               array.shape[1] * itemsize, itemsize)
    return np.lib.stride_tricks.as_strided(array, shape=shape, strides=strides)

array = np.arange(100)
array = array.reshape(10, 10)
w = rolling_window(array, 3)

# produces array with shape (8, 8, 3, 3) - edge cases are not currently dealt with.

因此,一系列 3 x 3 阵列,以 1,1 处的研究像素为中心,每个都在栅格阵列的另一个维度内 "rows" 例如,从输入的一个像素,表示它的数组可能如下所示,其中像素值 4 是研究像素,其他值是它的直接邻居。

array([[[[ 0,  1,  2],
         [ 3,  4,  5],
         [ 6,  7,  8]]]])

我当前使用此多维数组的方法的简化版本是以下函数:

def flow_dir(array):

    # Value to assign output based on element index.
    flow_idx_dict = {0: 32,
                     1: 64,
                     2: 128,
                     3: 16,
                     5: 1,
                     6: 8,
                     7: 4,
                     8: 2}

    # Generates the rolling window array as mentioned above.
    w = rolling_window(array, 3)

    # Iterate though each pixel array.
    for x, i in enumerate(w, 1):
        for y, j in enumerate(i, 1):
            j = j.flatten()

            # Centre pixel value after flattening.
            centre = j[4]

            # Some default values.
            idx = 4
            max_drop = 0

            # Iterate over pixel values in array.
            for count, px in enumerate(j):

                # Calculate difference between centre pixel and neighbour.
                drop = centre - px

                # Find the maximum difference pixel index.
                if count != 4:
                    if drop > max_drop:
                        max_drop = drop
                        idx = count

            # Assign a value from a dict, matching index to flow direction category.
            value = flow_idx_dict[idx]

            # Update each pixel in the input array with the flow direction.
            array[x, y] = value
    return array

可以理解,所有这些 for 循环和 if 语句都非常慢。我知道必须有一个矢量化的 numpy 方法来做到这一点,但我正在努力寻找我需要的确切功能,或者可能不明白如何正确地实现它们。我尝试了 np.apply_along_axis、np.where、np.nditer 等,但到目前为止都无济于事。我认为我需要的是:

  1. 一种将函数应用于由滚动 window 产生的每个像素阵列而不使用 for 循环访问它们的方法。

  2. 找到最大掉落索引值,不使用 if 语句并枚举。

  3. 能够批量更新输入数组,而不是单个元素。

我觉得这里可以避免滚动windows;与 NxNx3x3 相比,在 NxN 数组上进行矢量化更容易且可读性更高。

考虑这个数据:

array = np.array([[78, 72, 69, 71, 58, 49],
       [74, 67, 56, 49, 46, 50],
       [69, 53, 44, 37, 38, 48],
       [64, 58, 55, 22, 33, 24],
       [68, 61, 47, 21, 16, 19],
       [74, 53, 34, 12, 11, 12]])
N=6

首先,计算 8 个梯度并这样编码:

gradient = np.empty((8,N-2,N-2),dtype=np.float)
code = np.empty(8,dtype=np.int)
for k in range(8):
    theta = -k*np.pi/4
    code[k] = 2**k
    j, i = np.int(1.5*np.cos(theta)),-np.int(1.5*np.sin(theta))
    d = np.linalg.norm([i,j])
    gradient[k] = (array[1+i: N-1+i,1+j: N-1+j]-array[1: N-1,1: N-1])/d

速度快,因为外部循环很少(8)。 (-gradient).argmax(axis=0) 为每个像素指定流向。

最后,take 代码:

direction = (-gradient).argmax(axis=0)
result = code.take(direction)

结果:

array([[  2,   2,   4,   4],
       [  1,   2,   4,   8],
       [128,   1,   2,   4],
       [  2,   1,   4,   4]])