矩阵处理更快

faster processing with matrix

我有一个输入数据为1028*24*24*16的数组。当我 运行 下面的代码时,它运行得非常慢。我怎样才能加快速度?谢谢

(我想从一个大数组中得到3*3矩阵。)

import itertools,math,time,random
import numpy as np

start=time.time()

def ls(x):
     x_p = x / np.sum(x)         
     return np.max(x_p)

inputs = np.random.rand(1028, 24, 24, 16)
b, r, c, ch = inputs.shape[0], inputs.shape[1], inputs.shape[2], inputs.shape[3]

inputs = np.transpose(inputs, (0, 3, 1, 2)) 
inputs = np.reshape(inputs, (b*ch, r, c))

s=2
r=3
num_r=9
num_c=9
ke = np.zeros((b*ch, num_r, num_c), dtype=np.float32)

for i in range(num_r):
     for j in range(num_c):
         outs = np.array(list(map(ls, inputs[:, i*s:i*s+r, j*s:j*s+r])))
         ke[:, i, j] = outs

ke = np.reshape(ke, (b, ch, num_r, num_c))
ke = np.transpose(ke, (0, 2, 3, 1))

print(time.time() - start)

我认为问题主要在于应该向量化的函数 ls 和花费你时间的列表/映射

import itertools,math,time,random
import numpy as np

start=time.time()

def ls(x):
     x_p = x / np.sum(np.sum(x, axis=1), axis=1)[:,None,None]
     return np.max(np.max(x_p,axis=1),axis=1)

inputs = np.random.rand(1028, 24, 24, 16)
b, r, c, ch = inputs.shape[0], inputs.shape[1], inputs.shape[2], inputs.shape[3]

inputs = np.transpose(inputs, (0, 3, 1, 2))
inputs = np.reshape(inputs, (b*ch, r, c))

s=2
r=3
num_r=9
num_c=9
ke = np.zeros((b*ch, num_r, num_c), dtype=np.float32)

for i in range(num_r):
    print(i)
    for j in range(num_c):
        # outs = np.array(list(map(ls, inputs[:, i*s:i*s+r, j*s:j*s+r])))
        outs =  ls(inputs[:, i*s:i*s+r, j*s:j*s+r])
        ke[:, i, j] = outs

ke = np.reshape(ke, (b, ch, num_r, num_c))
ke = np.transpose(ke, (0, 2, 3, 1))

print(time.time() - start)

原始代码可以包装(经过一些修饰)到以下函数中:

import numpy as np


def foo0(arr, s=2, r=3, nr=9, nc=9):
    forward_axes = (0, 3, 1, 2)
    backward_axes = (0, 2, 3, 1)
    b, r, c, ch = arr.shape
    arr = np.transpose(arr, forward_axes) 
    arr = np.reshape(arr, (b * ch, r, c))
    result = np.zeros((b * ch, nr, nc), dtype=np.float32)
    for i in range(nr):
        for j in range(nc):
            result[:, i, j] = np.fromiter(
                map(
                    lambda x: np.max(x / np.sum(x)),
                    arr[:, i * s:i * s + r, j * s:j * s + r]),
                dtype=np.float32)
    result = np.reshape(result, (b, ch, nr, nc))
    result = np.transpose(result, backward_axes)
    return result

虽然显式循环表明可以在此处应用 Numba 以获得一些 low-hanging 水果,但不幸的是,如果不与 Python 对象交互,则无法轻松装饰该函数,从而大大减少了 speed-up. 幸运的是,核心计算可以很容易地向量化,只要 nrnc 足够小,这种优化就足够了:

def foo1(arr, s=2, r=3, nr=9, nc=9):
    forward_axes = (0, 3, 1, 2)
    backward_axes = (0, 2, 3, 1)
    b, r, c, ch = arr.shape
    arr = np.transpose(arr, forward_axes) 
    arr = np.reshape(arr, (b * ch, r, c))
    result = np.zeros((b * ch, nr, nc), dtype=np.float32)
    for i in range(nr):
        for j in range(nc):
            x = arr[:, i * s:i * s + r, j * s:j * s + r]
            result[:, i, j] = np.max(x, (-1, -2)) / np.sum(x, (-1, -2))
    result = np.reshape(result, (b, ch, nr, nc))
    result = np.transpose(result, backward_axes)
    return result

(上面的 foo1() 本质上等同于 并进行了一些额外的优化。)

注意max(x / k)max(x) / k是一样的但是分割数大大减少了

实际上,转置和整形,虽然它可能有助于提高计算速度,但并不是真正必要的:

def foo2(arr, s=2, r=3, nr=9, nc=9):
    b, r, c, ch = arr.shape
    result = np.zeros((b, nr, nc, ch), dtype=np.float32)
    for i in range(nr):
        for j in range(nc):
            x = arr[:, i * s:i * s + r, j * s:j * s + r, :]
            result[:, i, j, :] = np.max(x, (1, 2)) / np.sum(x, (1, 2))
    return result

以上内容在 Numba 中翻译起来更简单,但小 nr/nc 的速度增益将是最小的(与部分矢量化方法相比):

import numba as nb


@nb.njit
def sum_nb(arr):
    result = 0
    for x in arr:
        result += x
    return result


@nb.njit
def max_nb(arr):
    result = arr[0]
    for x in arr[1:]:
        if x > result:
            result = x
    return result


@nb.njit
def _sum_max(arr):
    b, r, c, ch = arr.shape
    res = np.empty((b, ch), dtype=arr.dtype)
    for i in range(b):
        for j in range(ch):
            x = arr[i, :, :, j].ravel()
            res[i, j] = max_nb(x) / sum_nb(x)
    return res


@nb.njit
def foo3(arr, s=2, r=3, nr=9, nc=9):
    b, r, c, ch = arr.shape
    result = np.zeros((b, nr, nc, ch), dtype=np.float32)
    for i in range(nr):
        for j in range(nc):
            result[:, i, j, :] = _sum_max(arr[:, i * s:i * s + r, j * s:j * s + r, :])
    return result

另一种选择是将 Numba-incompatible 代码保留在主循环之外:

@nb.njit(fastmath=True)
def _foo4(arr, result, s, r, nr, nc):
    bch, nr, nc = result.shape
    for i in range(nr):
        for j in range(nc):
            for k in range(bch):
                x = arr[k, i * s:i * s + r, j * s:j * s + r].ravel()
                result[k, i, j] = max_nb(x) / sum_nb(x)
    return result


def foo4(arr, s=2, r=3, nr=9, nc=9):
    forward_axes = (0, 3, 1, 2)
    backward_axes = (0, 2, 3, 1)
    b, r, c, ch = arr.shape
    arr = np.transpose(arr, forward_axes) 
    arr = np.reshape(arr, (b * ch, r, c))
    result = np.empty((b * ch, nr, nc))
    result = _foo4(arr, result, s, r, nr, nc)
    result = np.reshape(result, (b, ch, nr, nc))
    result = np.transpose(result, backward_axes)
    return result

但同样速度增益将是最小的。

请注意,完全矢量化的方法不太可能有效,因为主循环内的对象是参差不齐的。

获得一些相对速度的想法:

funcs = foo0, foo1, foo2, foo3, foo4
arr = np.random.rand(100, 24, 24, 16)


timeds_n = {}
for p in range(1):
    n = 10 ** p
    k = 3
    arr = np.random.rand(100, 24, 24, 16)
    print(f"N = {arr.size}")
    base = funcs[0](arr)
    timeds_n[n] = []
    for func in funcs:
        res = func(arr)
        timed = %timeit -r 1 -n 1 -q -o func(arr)
        timeds_n[n].append(timed.best)
        print(f"{func.__name__:>24}  {np.allclose(base, res)}  {timed.best:.9f}")
N = 921600
                    foo0  True  1.757508748
                    foo1  True  0.095540081
                    foo2  True  0.179208341
                    foo3  True  0.160671403
                    foo4  True  0.155691721