将函数应用于没有循环的多维 numpy 数组
Applying functions to multidimensional numpy arrays without loops
我正在使用 numpy 处理栅格数据(从 GDAL 读取后),它表示高程。我的目标是使用 numpy 计算阵列中每个像素的水流方向,主要根据给定像素与其 8 个相邻像素之间的海拔差异来确定。
我已经实现了滚动 window 技术来生成每个像素及其邻居的多维数组,其工作原理如下:
def rolling_window(array, window_size):
itemsize = array.itemsize
shape = (array.shape[0] - window_size + 1,
array.shape[1] - window_size + 1,
window_size, window_size)
strides = (array.shape[1] * itemsize, itemsize,
array.shape[1] * itemsize, itemsize)
return np.lib.stride_tricks.as_strided(array, shape=shape, strides=strides)
array = np.arange(100)
array = array.reshape(10, 10)
w = rolling_window(array, 3)
# produces array with shape (8, 8, 3, 3) - edge cases are not currently dealt with.
因此,一系列 3 x 3 阵列,以 1,1 处的研究像素为中心,每个都在栅格阵列的另一个维度内 "rows" 例如,从输入的一个像素,表示它的数组可能如下所示,其中像素值 4 是研究像素,其他值是它的直接邻居。
array([[[[ 0, 1, 2],
[ 3, 4, 5],
[ 6, 7, 8]]]])
我当前使用此多维数组的方法的简化版本是以下函数:
def flow_dir(array):
# Value to assign output based on element index.
flow_idx_dict = {0: 32,
1: 64,
2: 128,
3: 16,
5: 1,
6: 8,
7: 4,
8: 2}
# Generates the rolling window array as mentioned above.
w = rolling_window(array, 3)
# Iterate though each pixel array.
for x, i in enumerate(w, 1):
for y, j in enumerate(i, 1):
j = j.flatten()
# Centre pixel value after flattening.
centre = j[4]
# Some default values.
idx = 4
max_drop = 0
# Iterate over pixel values in array.
for count, px in enumerate(j):
# Calculate difference between centre pixel and neighbour.
drop = centre - px
# Find the maximum difference pixel index.
if count != 4:
if drop > max_drop:
max_drop = drop
idx = count
# Assign a value from a dict, matching index to flow direction category.
value = flow_idx_dict[idx]
# Update each pixel in the input array with the flow direction.
array[x, y] = value
return array
可以理解,所有这些 for 循环和 if 语句都非常慢。我知道必须有一个矢量化的 numpy 方法来做到这一点,但我正在努力寻找我需要的确切功能,或者可能不明白如何正确地实现它们。我尝试了 np.apply_along_axis、np.where、np.nditer 等,但到目前为止都无济于事。我认为我需要的是:
一种将函数应用于由滚动 window 产生的每个像素阵列而不使用 for 循环访问它们的方法。
找到最大掉落索引值,不使用 if 语句并枚举。
能够批量更新输入数组,而不是单个元素。
我觉得这里可以避免滚动windows;与 NxNx3x3 相比,在 NxN 数组上进行矢量化更容易且可读性更高。
考虑这个数据:
array = np.array([[78, 72, 69, 71, 58, 49],
[74, 67, 56, 49, 46, 50],
[69, 53, 44, 37, 38, 48],
[64, 58, 55, 22, 33, 24],
[68, 61, 47, 21, 16, 19],
[74, 53, 34, 12, 11, 12]])
N=6
首先,计算 8 个梯度并这样编码:
gradient = np.empty((8,N-2,N-2),dtype=np.float)
code = np.empty(8,dtype=np.int)
for k in range(8):
theta = -k*np.pi/4
code[k] = 2**k
j, i = np.int(1.5*np.cos(theta)),-np.int(1.5*np.sin(theta))
d = np.linalg.norm([i,j])
gradient[k] = (array[1+i: N-1+i,1+j: N-1+j]-array[1: N-1,1: N-1])/d
速度快,因为外部循环很少(8)。
(-gradient).argmax(axis=0)
为每个像素指定流向。
最后,take
代码:
direction = (-gradient).argmax(axis=0)
result = code.take(direction)
结果:
array([[ 2, 2, 4, 4],
[ 1, 2, 4, 8],
[128, 1, 2, 4],
[ 2, 1, 4, 4]])
我正在使用 numpy 处理栅格数据(从 GDAL 读取后),它表示高程。我的目标是使用 numpy 计算阵列中每个像素的水流方向,主要根据给定像素与其 8 个相邻像素之间的海拔差异来确定。
我已经实现了滚动 window 技术来生成每个像素及其邻居的多维数组,其工作原理如下:
def rolling_window(array, window_size):
itemsize = array.itemsize
shape = (array.shape[0] - window_size + 1,
array.shape[1] - window_size + 1,
window_size, window_size)
strides = (array.shape[1] * itemsize, itemsize,
array.shape[1] * itemsize, itemsize)
return np.lib.stride_tricks.as_strided(array, shape=shape, strides=strides)
array = np.arange(100)
array = array.reshape(10, 10)
w = rolling_window(array, 3)
# produces array with shape (8, 8, 3, 3) - edge cases are not currently dealt with.
因此,一系列 3 x 3 阵列,以 1,1 处的研究像素为中心,每个都在栅格阵列的另一个维度内 "rows" 例如,从输入的一个像素,表示它的数组可能如下所示,其中像素值 4 是研究像素,其他值是它的直接邻居。
array([[[[ 0, 1, 2],
[ 3, 4, 5],
[ 6, 7, 8]]]])
我当前使用此多维数组的方法的简化版本是以下函数:
def flow_dir(array):
# Value to assign output based on element index.
flow_idx_dict = {0: 32,
1: 64,
2: 128,
3: 16,
5: 1,
6: 8,
7: 4,
8: 2}
# Generates the rolling window array as mentioned above.
w = rolling_window(array, 3)
# Iterate though each pixel array.
for x, i in enumerate(w, 1):
for y, j in enumerate(i, 1):
j = j.flatten()
# Centre pixel value after flattening.
centre = j[4]
# Some default values.
idx = 4
max_drop = 0
# Iterate over pixel values in array.
for count, px in enumerate(j):
# Calculate difference between centre pixel and neighbour.
drop = centre - px
# Find the maximum difference pixel index.
if count != 4:
if drop > max_drop:
max_drop = drop
idx = count
# Assign a value from a dict, matching index to flow direction category.
value = flow_idx_dict[idx]
# Update each pixel in the input array with the flow direction.
array[x, y] = value
return array
可以理解,所有这些 for 循环和 if 语句都非常慢。我知道必须有一个矢量化的 numpy 方法来做到这一点,但我正在努力寻找我需要的确切功能,或者可能不明白如何正确地实现它们。我尝试了 np.apply_along_axis、np.where、np.nditer 等,但到目前为止都无济于事。我认为我需要的是:
一种将函数应用于由滚动 window 产生的每个像素阵列而不使用 for 循环访问它们的方法。
找到最大掉落索引值,不使用 if 语句并枚举。
能够批量更新输入数组,而不是单个元素。
我觉得这里可以避免滚动windows;与 NxNx3x3 相比,在 NxN 数组上进行矢量化更容易且可读性更高。
考虑这个数据:
array = np.array([[78, 72, 69, 71, 58, 49],
[74, 67, 56, 49, 46, 50],
[69, 53, 44, 37, 38, 48],
[64, 58, 55, 22, 33, 24],
[68, 61, 47, 21, 16, 19],
[74, 53, 34, 12, 11, 12]])
N=6
首先,计算 8 个梯度并这样编码:
gradient = np.empty((8,N-2,N-2),dtype=np.float)
code = np.empty(8,dtype=np.int)
for k in range(8):
theta = -k*np.pi/4
code[k] = 2**k
j, i = np.int(1.5*np.cos(theta)),-np.int(1.5*np.sin(theta))
d = np.linalg.norm([i,j])
gradient[k] = (array[1+i: N-1+i,1+j: N-1+j]-array[1: N-1,1: N-1])/d
速度快,因为外部循环很少(8)。
(-gradient).argmax(axis=0)
为每个像素指定流向。
最后,take
代码:
direction = (-gradient).argmax(axis=0)
result = code.take(direction)
结果:
array([[ 2, 2, 4, 4],
[ 1, 2, 4, 8],
[128, 1, 2, 4],
[ 2, 1, 4, 4]])