矩阵处理更快
faster processing with matrix
我有一个输入数据为1028*24*24*16的数组。当我 运行 下面的代码时,它运行得非常慢。我怎样才能加快速度?谢谢
(我想从一个大数组中得到3*3矩阵。)
import itertools,math,time,random
import numpy as np
start=time.time()
def ls(x):
x_p = x / np.sum(x)
return np.max(x_p)
inputs = np.random.rand(1028, 24, 24, 16)
b, r, c, ch = inputs.shape[0], inputs.shape[1], inputs.shape[2], inputs.shape[3]
inputs = np.transpose(inputs, (0, 3, 1, 2))
inputs = np.reshape(inputs, (b*ch, r, c))
s=2
r=3
num_r=9
num_c=9
ke = np.zeros((b*ch, num_r, num_c), dtype=np.float32)
for i in range(num_r):
for j in range(num_c):
outs = np.array(list(map(ls, inputs[:, i*s:i*s+r, j*s:j*s+r])))
ke[:, i, j] = outs
ke = np.reshape(ke, (b, ch, num_r, num_c))
ke = np.transpose(ke, (0, 2, 3, 1))
print(time.time() - start)
我认为问题主要在于应该向量化的函数 ls 和花费你时间的列表/映射
import itertools,math,time,random
import numpy as np
start=time.time()
def ls(x):
x_p = x / np.sum(np.sum(x, axis=1), axis=1)[:,None,None]
return np.max(np.max(x_p,axis=1),axis=1)
inputs = np.random.rand(1028, 24, 24, 16)
b, r, c, ch = inputs.shape[0], inputs.shape[1], inputs.shape[2], inputs.shape[3]
inputs = np.transpose(inputs, (0, 3, 1, 2))
inputs = np.reshape(inputs, (b*ch, r, c))
s=2
r=3
num_r=9
num_c=9
ke = np.zeros((b*ch, num_r, num_c), dtype=np.float32)
for i in range(num_r):
print(i)
for j in range(num_c):
# outs = np.array(list(map(ls, inputs[:, i*s:i*s+r, j*s:j*s+r])))
outs = ls(inputs[:, i*s:i*s+r, j*s:j*s+r])
ke[:, i, j] = outs
ke = np.reshape(ke, (b, ch, num_r, num_c))
ke = np.transpose(ke, (0, 2, 3, 1))
print(time.time() - start)
原始代码可以包装(经过一些修饰)到以下函数中:
import numpy as np
def foo0(arr, s=2, r=3, nr=9, nc=9):
forward_axes = (0, 3, 1, 2)
backward_axes = (0, 2, 3, 1)
b, r, c, ch = arr.shape
arr = np.transpose(arr, forward_axes)
arr = np.reshape(arr, (b * ch, r, c))
result = np.zeros((b * ch, nr, nc), dtype=np.float32)
for i in range(nr):
for j in range(nc):
result[:, i, j] = np.fromiter(
map(
lambda x: np.max(x / np.sum(x)),
arr[:, i * s:i * s + r, j * s:j * s + r]),
dtype=np.float32)
result = np.reshape(result, (b, ch, nr, nc))
result = np.transpose(result, backward_axes)
return result
虽然显式循环表明可以在此处应用 Numba 以获得一些 low-hanging 水果,但不幸的是,如果不与 Python 对象交互,则无法轻松装饰该函数,从而大大减少了 speed-up.
幸运的是,核心计算可以很容易地向量化,只要 nr
和 nc
足够小,这种优化就足够了:
def foo1(arr, s=2, r=3, nr=9, nc=9):
forward_axes = (0, 3, 1, 2)
backward_axes = (0, 2, 3, 1)
b, r, c, ch = arr.shape
arr = np.transpose(arr, forward_axes)
arr = np.reshape(arr, (b * ch, r, c))
result = np.zeros((b * ch, nr, nc), dtype=np.float32)
for i in range(nr):
for j in range(nc):
x = arr[:, i * s:i * s + r, j * s:j * s + r]
result[:, i, j] = np.max(x, (-1, -2)) / np.sum(x, (-1, -2))
result = np.reshape(result, (b, ch, nr, nc))
result = np.transpose(result, backward_axes)
return result
(上面的 foo1()
本质上等同于 并进行了一些额外的优化。)
注意max(x / k)
和max(x) / k
是一样的但是分割数大大减少了
实际上,转置和整形,虽然它可能有助于提高计算速度,但并不是真正必要的:
def foo2(arr, s=2, r=3, nr=9, nc=9):
b, r, c, ch = arr.shape
result = np.zeros((b, nr, nc, ch), dtype=np.float32)
for i in range(nr):
for j in range(nc):
x = arr[:, i * s:i * s + r, j * s:j * s + r, :]
result[:, i, j, :] = np.max(x, (1, 2)) / np.sum(x, (1, 2))
return result
以上内容在 Numba 中翻译起来更简单,但小 nr
/nc
的速度增益将是最小的(与部分矢量化方法相比):
import numba as nb
@nb.njit
def sum_nb(arr):
result = 0
for x in arr:
result += x
return result
@nb.njit
def max_nb(arr):
result = arr[0]
for x in arr[1:]:
if x > result:
result = x
return result
@nb.njit
def _sum_max(arr):
b, r, c, ch = arr.shape
res = np.empty((b, ch), dtype=arr.dtype)
for i in range(b):
for j in range(ch):
x = arr[i, :, :, j].ravel()
res[i, j] = max_nb(x) / sum_nb(x)
return res
@nb.njit
def foo3(arr, s=2, r=3, nr=9, nc=9):
b, r, c, ch = arr.shape
result = np.zeros((b, nr, nc, ch), dtype=np.float32)
for i in range(nr):
for j in range(nc):
result[:, i, j, :] = _sum_max(arr[:, i * s:i * s + r, j * s:j * s + r, :])
return result
另一种选择是将 Numba-incompatible 代码保留在主循环之外:
@nb.njit(fastmath=True)
def _foo4(arr, result, s, r, nr, nc):
bch, nr, nc = result.shape
for i in range(nr):
for j in range(nc):
for k in range(bch):
x = arr[k, i * s:i * s + r, j * s:j * s + r].ravel()
result[k, i, j] = max_nb(x) / sum_nb(x)
return result
def foo4(arr, s=2, r=3, nr=9, nc=9):
forward_axes = (0, 3, 1, 2)
backward_axes = (0, 2, 3, 1)
b, r, c, ch = arr.shape
arr = np.transpose(arr, forward_axes)
arr = np.reshape(arr, (b * ch, r, c))
result = np.empty((b * ch, nr, nc))
result = _foo4(arr, result, s, r, nr, nc)
result = np.reshape(result, (b, ch, nr, nc))
result = np.transpose(result, backward_axes)
return result
但同样速度增益将是最小的。
请注意,完全矢量化的方法不太可能有效,因为主循环内的对象是参差不齐的。
获得一些相对速度的想法:
funcs = foo0, foo1, foo2, foo3, foo4
arr = np.random.rand(100, 24, 24, 16)
timeds_n = {}
for p in range(1):
n = 10 ** p
k = 3
arr = np.random.rand(100, 24, 24, 16)
print(f"N = {arr.size}")
base = funcs[0](arr)
timeds_n[n] = []
for func in funcs:
res = func(arr)
timed = %timeit -r 1 -n 1 -q -o func(arr)
timeds_n[n].append(timed.best)
print(f"{func.__name__:>24} {np.allclose(base, res)} {timed.best:.9f}")
N = 921600
foo0 True 1.757508748
foo1 True 0.095540081
foo2 True 0.179208341
foo3 True 0.160671403
foo4 True 0.155691721
我有一个输入数据为1028*24*24*16的数组。当我 运行 下面的代码时,它运行得非常慢。我怎样才能加快速度?谢谢
(我想从一个大数组中得到3*3矩阵。)
import itertools,math,time,random
import numpy as np
start=time.time()
def ls(x):
x_p = x / np.sum(x)
return np.max(x_p)
inputs = np.random.rand(1028, 24, 24, 16)
b, r, c, ch = inputs.shape[0], inputs.shape[1], inputs.shape[2], inputs.shape[3]
inputs = np.transpose(inputs, (0, 3, 1, 2))
inputs = np.reshape(inputs, (b*ch, r, c))
s=2
r=3
num_r=9
num_c=9
ke = np.zeros((b*ch, num_r, num_c), dtype=np.float32)
for i in range(num_r):
for j in range(num_c):
outs = np.array(list(map(ls, inputs[:, i*s:i*s+r, j*s:j*s+r])))
ke[:, i, j] = outs
ke = np.reshape(ke, (b, ch, num_r, num_c))
ke = np.transpose(ke, (0, 2, 3, 1))
print(time.time() - start)
我认为问题主要在于应该向量化的函数 ls 和花费你时间的列表/映射
import itertools,math,time,random
import numpy as np
start=time.time()
def ls(x):
x_p = x / np.sum(np.sum(x, axis=1), axis=1)[:,None,None]
return np.max(np.max(x_p,axis=1),axis=1)
inputs = np.random.rand(1028, 24, 24, 16)
b, r, c, ch = inputs.shape[0], inputs.shape[1], inputs.shape[2], inputs.shape[3]
inputs = np.transpose(inputs, (0, 3, 1, 2))
inputs = np.reshape(inputs, (b*ch, r, c))
s=2
r=3
num_r=9
num_c=9
ke = np.zeros((b*ch, num_r, num_c), dtype=np.float32)
for i in range(num_r):
print(i)
for j in range(num_c):
# outs = np.array(list(map(ls, inputs[:, i*s:i*s+r, j*s:j*s+r])))
outs = ls(inputs[:, i*s:i*s+r, j*s:j*s+r])
ke[:, i, j] = outs
ke = np.reshape(ke, (b, ch, num_r, num_c))
ke = np.transpose(ke, (0, 2, 3, 1))
print(time.time() - start)
原始代码可以包装(经过一些修饰)到以下函数中:
import numpy as np
def foo0(arr, s=2, r=3, nr=9, nc=9):
forward_axes = (0, 3, 1, 2)
backward_axes = (0, 2, 3, 1)
b, r, c, ch = arr.shape
arr = np.transpose(arr, forward_axes)
arr = np.reshape(arr, (b * ch, r, c))
result = np.zeros((b * ch, nr, nc), dtype=np.float32)
for i in range(nr):
for j in range(nc):
result[:, i, j] = np.fromiter(
map(
lambda x: np.max(x / np.sum(x)),
arr[:, i * s:i * s + r, j * s:j * s + r]),
dtype=np.float32)
result = np.reshape(result, (b, ch, nr, nc))
result = np.transpose(result, backward_axes)
return result
虽然显式循环表明可以在此处应用 Numba 以获得一些 low-hanging 水果,但不幸的是,如果不与 Python 对象交互,则无法轻松装饰该函数,从而大大减少了 speed-up.
幸运的是,核心计算可以很容易地向量化,只要 nr
和 nc
足够小,这种优化就足够了:
def foo1(arr, s=2, r=3, nr=9, nc=9):
forward_axes = (0, 3, 1, 2)
backward_axes = (0, 2, 3, 1)
b, r, c, ch = arr.shape
arr = np.transpose(arr, forward_axes)
arr = np.reshape(arr, (b * ch, r, c))
result = np.zeros((b * ch, nr, nc), dtype=np.float32)
for i in range(nr):
for j in range(nc):
x = arr[:, i * s:i * s + r, j * s:j * s + r]
result[:, i, j] = np.max(x, (-1, -2)) / np.sum(x, (-1, -2))
result = np.reshape(result, (b, ch, nr, nc))
result = np.transpose(result, backward_axes)
return result
(上面的 foo1()
本质上等同于
注意max(x / k)
和max(x) / k
是一样的但是分割数大大减少了
实际上,转置和整形,虽然它可能有助于提高计算速度,但并不是真正必要的:
def foo2(arr, s=2, r=3, nr=9, nc=9):
b, r, c, ch = arr.shape
result = np.zeros((b, nr, nc, ch), dtype=np.float32)
for i in range(nr):
for j in range(nc):
x = arr[:, i * s:i * s + r, j * s:j * s + r, :]
result[:, i, j, :] = np.max(x, (1, 2)) / np.sum(x, (1, 2))
return result
以上内容在 Numba 中翻译起来更简单,但小 nr
/nc
的速度增益将是最小的(与部分矢量化方法相比):
import numba as nb
@nb.njit
def sum_nb(arr):
result = 0
for x in arr:
result += x
return result
@nb.njit
def max_nb(arr):
result = arr[0]
for x in arr[1:]:
if x > result:
result = x
return result
@nb.njit
def _sum_max(arr):
b, r, c, ch = arr.shape
res = np.empty((b, ch), dtype=arr.dtype)
for i in range(b):
for j in range(ch):
x = arr[i, :, :, j].ravel()
res[i, j] = max_nb(x) / sum_nb(x)
return res
@nb.njit
def foo3(arr, s=2, r=3, nr=9, nc=9):
b, r, c, ch = arr.shape
result = np.zeros((b, nr, nc, ch), dtype=np.float32)
for i in range(nr):
for j in range(nc):
result[:, i, j, :] = _sum_max(arr[:, i * s:i * s + r, j * s:j * s + r, :])
return result
另一种选择是将 Numba-incompatible 代码保留在主循环之外:
@nb.njit(fastmath=True)
def _foo4(arr, result, s, r, nr, nc):
bch, nr, nc = result.shape
for i in range(nr):
for j in range(nc):
for k in range(bch):
x = arr[k, i * s:i * s + r, j * s:j * s + r].ravel()
result[k, i, j] = max_nb(x) / sum_nb(x)
return result
def foo4(arr, s=2, r=3, nr=9, nc=9):
forward_axes = (0, 3, 1, 2)
backward_axes = (0, 2, 3, 1)
b, r, c, ch = arr.shape
arr = np.transpose(arr, forward_axes)
arr = np.reshape(arr, (b * ch, r, c))
result = np.empty((b * ch, nr, nc))
result = _foo4(arr, result, s, r, nr, nc)
result = np.reshape(result, (b, ch, nr, nc))
result = np.transpose(result, backward_axes)
return result
但同样速度增益将是最小的。
请注意,完全矢量化的方法不太可能有效,因为主循环内的对象是参差不齐的。
获得一些相对速度的想法:
funcs = foo0, foo1, foo2, foo3, foo4
arr = np.random.rand(100, 24, 24, 16)
timeds_n = {}
for p in range(1):
n = 10 ** p
k = 3
arr = np.random.rand(100, 24, 24, 16)
print(f"N = {arr.size}")
base = funcs[0](arr)
timeds_n[n] = []
for func in funcs:
res = func(arr)
timed = %timeit -r 1 -n 1 -q -o func(arr)
timeds_n[n].append(timed.best)
print(f"{func.__name__:>24} {np.allclose(base, res)} {timed.best:.9f}")
N = 921600
foo0 True 1.757508748
foo1 True 0.095540081
foo2 True 0.179208341
foo3 True 0.160671403
foo4 True 0.155691721