相对原点累加滑动windows
Accumulate sliding windows relative to origin
我有一个形状为 (3,3)
的数组 A
,它可以被认为是形状为 (5,)
的未知数组的滑动 window 视图。我想计算 window 形状为 (5,)
的数组的倒数。伴随运算就是求和。我的意思是我想将每个对应 window 中的值与形状为 (5,)
的数组中的相关位置累加起来。当然,我这个反函数的预期输出和输入A
没有关系,只是普通的数组。我有两个例子,希望能更好地解释这一点。
A = np.array([[0, 0, 1],
[0, 0, 1],
[0, 0, 1]], dtype=np.float32)
我期望这样的输出:
np.array([0, 0, 1, 1, 1])
另一个例子:
A = np.array([[1, 2, 3],
[2, 3, 4],
[3, 4, 5]], dtype=np.float32)
我期望这样的输出:
np.array([1, 2+2, 3+3+3, 4+4, 5]) = np.array([1, 4, 9, 8, 5])
我的解决方案很慢(结果存储在out
)
out = np.zeros(5, dtype=np.float32)
windows = np.lib.stride_tricks.as_strided(out, shape=(3,3), strides=(4,4))
for i in np.ndindex(windows.shape):
windows[i] += A[i]
写入跨步视图感觉有点老套,我相信有更好的解决方案。
有没有办法不用 for 循环以矢量化的方式编写它? (这也适用于多个维度)
编辑
就更高维度的泛化而言,我遇到过 windows 取自图像(二维数组)的情况,而不是像上面的示例那样取自一维数组。对于 2d 情况,A
可以是 windows 大小 3
。这意味着从形状为 (4,4)
的图像(输出)中,windows A
将具有形状 (2,2,3,3)
.
A = np.array([[[[0, 0, 0],
[0, 1, 0],
[0, 0, 0]],
[[0, 0, 0],
[1, 0, 0],
[0, 0, 0]]],
[[[0, 1, 0],
[0, 0, 0],
[0, 0, 0]],
[[1, 0, 0],
[0, 0, 0],
[0, 0, 0]]]], dtype=np.float32)
使用Pablo给出的解决方案,出现如下错误
value array of shape (2,2,3,3) could not be broadcast to indexing result of shape (2,2)
使用我的步幅解决方案的略微修改版本:
def inverse_sliding_windows(A, window_sz, image_sz):
out = np.zeros(image_sz, dtype=np.float32)
windows = np.lib.stride_tricks.sliding_window_view(out, window_sz, writeable=True)
for i in np.ndindex(windows.shape):
windows[i] += A[i]
window_sz = (3,3)
image_sz = (4,4)
inverse_sliding_windows(A, window_sz, image_sz)
输出:
array([[0., 0., 0., 0.],
[0., 4., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 0., 0.]], dtype=float32)
澄清一下,window 大小和输出形状是事先已知的,参见 inverse_sliding_windows
。
IIUC 这里提出的问题相当于将矩阵 A
旋转 -45 度并按行求和(至少对于 2D 版本)。为了更好地理解旋转矩阵的含义,请参阅 this post。
def rotate45_and_sum(A):
n = len(A)
x, y = np.meshgrid(np.arange(n), np.arange(n))
xn, yn = x + y, n - x + y - 1
M = np.zeros((2*n -1, 2*n -1))
M[xn,yn] = A[x,y]
return M.sum(1)
A = np.array([[0, 0, 1],
[0, 0, 1],
[0, 0, 1]], dtype=np.float32)
print(rotate45_and_sum(A))
#[0. 0. 1. 1. 1.]
A = np.array([[1, 2, 3],
[2, 3, 4],
[3, 4, 5]], dtype=np.float32)
print(rotate45_and_sum(A))
#[1. 4. 9. 8. 5.]
M
是旋转后的矩阵。
免责声明:我不知道这是否可以推广到多个维度
正如我在评论中提到的,矢量化解决方案并不总能保证更好的 运行 时间。如果您的矩阵很大,您可能更喜欢更有效的方法。矩阵旋转缓慢的原因有很多(尽管很直观),请参阅评论。
性能比较:
Solution: Wall time: 61.6 ms
Rotation: Wall time: 3.32 s
代码(在 jupyter notebook 中测试)
import numpy as np
def rotate45_and_sum(A):
n = len(A)
x, y = np.meshgrid(np.arange(n), np.arange(n)) # at least doubled the running time
xn, yn = x + y, n - x + y - 1 # generating xn and yn at least doubled the running time
M = np.zeros((2*n -1, 2*n -1)) # at least slows down running time by a factor of 4
M[xn,yn] = A[x,y] # very inefficient indexing strategy
return M.sum(1)
def solution(A):
n = A.shape[0]
retval = np.zeros(2*n-1)
for i in range(n):
retval[i:(i+n)] += A[i, :]
return retval
A = np.random.randn(10000, 10000)
%time solution(A)
%time rotate45_and_sum(A)
在多维情况下:
def solution(A):
h,w,x,y = A.shape # change here
retval = np.zeros((2*x-w,2*y-h)) # change here
indices = np.ndindex(w, h) # change here
for index in indices:
slices = tuple()
for i in range(len(index)):
slices = slices + (slice(index[i], index[i]+x),) # I assume x = y = ..., you need to change here also if the assumption is not correct
retval[slices] += A[index] # slices is roughly equal `i:(i+x), j:(j+y)` in your code
return retval
实际上我不知道如何根据您的描述计算尺寸(或形状):(。但我认为它可以概括。我的想法是随手构建 slices
。所以需要指定哪些维度对应h, w
,哪些维度对应x, y
。我觉得不难做到。
参考:
关于
def fast(A):
n = A.shape[0]
retval = np.zeros(2*n-1)
for i in range(n):
retval[i:(i+n)] += A[i, :]
print(retval.sum())
return retval
##########################
import threading
class sumThread(threading.Thread):
def __init__(self, A, mat, threadID, ngroups, size):
threading.Thread.__init__(self)
self.threadID = threadID
self.size = size
self.ngroups = ngroups
self.mat = mat
self.A = A
def run(self):
begin = (self.size + self.ngroups) // self.ngroups * self.threadID
end = min(self.size, (self.size+self.ngroups)//self.ngroups*(self.threadID+1))
for i in range(begin, end):
self.mat[self.threadID, i:(i+self.size)] += self.A[i, :]
def faster(A):
num_threads = max(1, A.shape[0] // 4000)
mat = np.zeros((num_threads, 2*A.shape[0]-1))
threads = []
for i in range(num_threads):
t = sumThread(A, mat, i, num_threads, A.shape[0])
t.start()
threads.append(t)
# Wait for all threads to complete
for t in threads:
t.join()
return np.sum(mat, axis=0)
大数组的性能:
A = np.random.randn(20000,20000)
%timeit fast(A) # 263 ms ± 5.21 ms per loop
%timeit faster(A) # 155 ms ± 3.14 ms per loop
在 fast
中并行化 for
循环很简单。但 fast
实际上是最高效的缓存(即使对于 GPU 缓存和内存组),因此也是最快的计算方式。理想情况下,您可以使用 CUDA/OpenCL 并行化代码,因为 GPU 中有更多的内核。如果操作正确,运行 时间将减少到 log(original_fast_time)
,基数 k
,其中 k
是您拥有的核心数。
然而,函数中只有少量计算。因此,内存和 GRAM 之间的数据传输可能占主导地位。 (我没测试)
扩展@Shihao Xu 给出的 fast
解决方案,我尝试通过在 numpy/core/src/multiarray:
中添加 np.fast_compiled
函数将其翻译成可编译的 c 代码
NPY_NO_EXPORT PyObject *
arr_fast_compiled(PyObject *NPY_UNUSED(self), PyObject *args)
{
PyObject *list_obj = NULL;
PyArrayObject *list_arr = NULL, *ans = NULL;
npy_intp len, ans_size;
npy_intp i, j, k;
double *dans, *numbers;
if (!PyArg_ParseTuple(args, "O", &list_obj)) {
goto fail;
}
list_arr = (PyArrayObject *)PyArray_ContiguousFromAny(list_obj, NPY_DOUBLE, 2, 2);
if (list_arr == NULL) {
goto fail;
}
len = PyArray_DIM(list_arr, 0);
numbers = (double *)PyArray_DATA(list_arr);
ans_size = 2*len-1;
ans = (PyArrayObject *)PyArray_ZEROS(1, &ans_size, NPY_DOUBLE, 0);
if (ans == NULL) {
goto fail;
}
dans = (double *)PyArray_DATA(ans);
NPY_BEGIN_ALLOW_THREADS;
for (i = 0; i < len; ++i) {
k = i * len;
for (j = i; j < i + len; ++j, ++k) {
dans[j] += numbers[k];
}
}
NPY_END_ALLOW_THREADS;
Py_DECREF(list_arr);
return (PyObject *)ans;
fail:
Py_XDECREF(list_arr);
Py_XDECREF(ans);
return NULL;
}
for 循环是最重要的:
for (i = 0; i < len; ++i) {
k = i * len;
for (j = i; j < i + len; ++j, ++k) {
dans[j] += numbers[k];
}
}
numbers
是输入参数 (A
),我们以跨步方式访问 numbers
和 dans
中的元素。在我的 3x3 示例中,我们有以下 j
和 k
值:
j = [0, 1, 2, 1, 2, 3, 2, 3, 4]
k = [0, 1, 2, 3, 4, 5, 6, 7, 8]
NPY_BEGIN_ALLOW_THREADS
是我经常看到的用于其他 numpy 函数的东西,但是当我不使用它进行测试时似乎没有性能差异。
性能与just_sum_0
相似
我有一个形状为 (3,3)
的数组 A
,它可以被认为是形状为 (5,)
的未知数组的滑动 window 视图。我想计算 window 形状为 (5,)
的数组的倒数。伴随运算就是求和。我的意思是我想将每个对应 window 中的值与形状为 (5,)
的数组中的相关位置累加起来。当然,我这个反函数的预期输出和输入A
没有关系,只是普通的数组。我有两个例子,希望能更好地解释这一点。
A = np.array([[0, 0, 1],
[0, 0, 1],
[0, 0, 1]], dtype=np.float32)
我期望这样的输出:
np.array([0, 0, 1, 1, 1])
另一个例子:
A = np.array([[1, 2, 3],
[2, 3, 4],
[3, 4, 5]], dtype=np.float32)
我期望这样的输出:
np.array([1, 2+2, 3+3+3, 4+4, 5]) = np.array([1, 4, 9, 8, 5])
我的解决方案很慢(结果存储在out
)
out = np.zeros(5, dtype=np.float32)
windows = np.lib.stride_tricks.as_strided(out, shape=(3,3), strides=(4,4))
for i in np.ndindex(windows.shape):
windows[i] += A[i]
写入跨步视图感觉有点老套,我相信有更好的解决方案。
有没有办法不用 for 循环以矢量化的方式编写它? (这也适用于多个维度)
编辑
就更高维度的泛化而言,我遇到过 windows 取自图像(二维数组)的情况,而不是像上面的示例那样取自一维数组。对于 2d 情况,A
可以是 windows 大小 3
。这意味着从形状为 (4,4)
的图像(输出)中,windows A
将具有形状 (2,2,3,3)
.
A = np.array([[[[0, 0, 0],
[0, 1, 0],
[0, 0, 0]],
[[0, 0, 0],
[1, 0, 0],
[0, 0, 0]]],
[[[0, 1, 0],
[0, 0, 0],
[0, 0, 0]],
[[1, 0, 0],
[0, 0, 0],
[0, 0, 0]]]], dtype=np.float32)
使用Pablo给出的解决方案,出现如下错误
value array of shape (2,2,3,3) could not be broadcast to indexing result of shape (2,2)
使用我的步幅解决方案的略微修改版本:
def inverse_sliding_windows(A, window_sz, image_sz):
out = np.zeros(image_sz, dtype=np.float32)
windows = np.lib.stride_tricks.sliding_window_view(out, window_sz, writeable=True)
for i in np.ndindex(windows.shape):
windows[i] += A[i]
window_sz = (3,3)
image_sz = (4,4)
inverse_sliding_windows(A, window_sz, image_sz)
输出:
array([[0., 0., 0., 0.],
[0., 4., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 0., 0.]], dtype=float32)
澄清一下,window 大小和输出形状是事先已知的,参见 inverse_sliding_windows
。
IIUC 这里提出的问题相当于将矩阵 A
旋转 -45 度并按行求和(至少对于 2D 版本)。为了更好地理解旋转矩阵的含义,请参阅 this post。
def rotate45_and_sum(A):
n = len(A)
x, y = np.meshgrid(np.arange(n), np.arange(n))
xn, yn = x + y, n - x + y - 1
M = np.zeros((2*n -1, 2*n -1))
M[xn,yn] = A[x,y]
return M.sum(1)
A = np.array([[0, 0, 1],
[0, 0, 1],
[0, 0, 1]], dtype=np.float32)
print(rotate45_and_sum(A))
#[0. 0. 1. 1. 1.]
A = np.array([[1, 2, 3],
[2, 3, 4],
[3, 4, 5]], dtype=np.float32)
print(rotate45_and_sum(A))
#[1. 4. 9. 8. 5.]
M
是旋转后的矩阵。
免责声明:我不知道这是否可以推广到多个维度
正如我在评论中提到的,矢量化解决方案并不总能保证更好的 运行 时间。如果您的矩阵很大,您可能更喜欢更有效的方法。矩阵旋转缓慢的原因有很多(尽管很直观),请参阅评论。
性能比较:
Solution: Wall time: 61.6 ms
Rotation: Wall time: 3.32 s
代码(在 jupyter notebook 中测试)
import numpy as np
def rotate45_and_sum(A):
n = len(A)
x, y = np.meshgrid(np.arange(n), np.arange(n)) # at least doubled the running time
xn, yn = x + y, n - x + y - 1 # generating xn and yn at least doubled the running time
M = np.zeros((2*n -1, 2*n -1)) # at least slows down running time by a factor of 4
M[xn,yn] = A[x,y] # very inefficient indexing strategy
return M.sum(1)
def solution(A):
n = A.shape[0]
retval = np.zeros(2*n-1)
for i in range(n):
retval[i:(i+n)] += A[i, :]
return retval
A = np.random.randn(10000, 10000)
%time solution(A)
%time rotate45_and_sum(A)
在多维情况下:
def solution(A):
h,w,x,y = A.shape # change here
retval = np.zeros((2*x-w,2*y-h)) # change here
indices = np.ndindex(w, h) # change here
for index in indices:
slices = tuple()
for i in range(len(index)):
slices = slices + (slice(index[i], index[i]+x),) # I assume x = y = ..., you need to change here also if the assumption is not correct
retval[slices] += A[index] # slices is roughly equal `i:(i+x), j:(j+y)` in your code
return retval
实际上我不知道如何根据您的描述计算尺寸(或形状):(。但我认为它可以概括。我的想法是随手构建 slices
。所以需要指定哪些维度对应h, w
,哪些维度对应x, y
。我觉得不难做到。
参考:
关于
def fast(A):
n = A.shape[0]
retval = np.zeros(2*n-1)
for i in range(n):
retval[i:(i+n)] += A[i, :]
print(retval.sum())
return retval
##########################
import threading
class sumThread(threading.Thread):
def __init__(self, A, mat, threadID, ngroups, size):
threading.Thread.__init__(self)
self.threadID = threadID
self.size = size
self.ngroups = ngroups
self.mat = mat
self.A = A
def run(self):
begin = (self.size + self.ngroups) // self.ngroups * self.threadID
end = min(self.size, (self.size+self.ngroups)//self.ngroups*(self.threadID+1))
for i in range(begin, end):
self.mat[self.threadID, i:(i+self.size)] += self.A[i, :]
def faster(A):
num_threads = max(1, A.shape[0] // 4000)
mat = np.zeros((num_threads, 2*A.shape[0]-1))
threads = []
for i in range(num_threads):
t = sumThread(A, mat, i, num_threads, A.shape[0])
t.start()
threads.append(t)
# Wait for all threads to complete
for t in threads:
t.join()
return np.sum(mat, axis=0)
大数组的性能:
A = np.random.randn(20000,20000)
%timeit fast(A) # 263 ms ± 5.21 ms per loop
%timeit faster(A) # 155 ms ± 3.14 ms per loop
在 fast
中并行化 for
循环很简单。但 fast
实际上是最高效的缓存(即使对于 GPU 缓存和内存组),因此也是最快的计算方式。理想情况下,您可以使用 CUDA/OpenCL 并行化代码,因为 GPU 中有更多的内核。如果操作正确,运行 时间将减少到 log(original_fast_time)
,基数 k
,其中 k
是您拥有的核心数。
然而,函数中只有少量计算。因此,内存和 GRAM 之间的数据传输可能占主导地位。 (我没测试)
扩展@Shihao Xu 给出的 fast
解决方案,我尝试通过在 numpy/core/src/multiarray:
np.fast_compiled
函数将其翻译成可编译的 c 代码
NPY_NO_EXPORT PyObject *
arr_fast_compiled(PyObject *NPY_UNUSED(self), PyObject *args)
{
PyObject *list_obj = NULL;
PyArrayObject *list_arr = NULL, *ans = NULL;
npy_intp len, ans_size;
npy_intp i, j, k;
double *dans, *numbers;
if (!PyArg_ParseTuple(args, "O", &list_obj)) {
goto fail;
}
list_arr = (PyArrayObject *)PyArray_ContiguousFromAny(list_obj, NPY_DOUBLE, 2, 2);
if (list_arr == NULL) {
goto fail;
}
len = PyArray_DIM(list_arr, 0);
numbers = (double *)PyArray_DATA(list_arr);
ans_size = 2*len-1;
ans = (PyArrayObject *)PyArray_ZEROS(1, &ans_size, NPY_DOUBLE, 0);
if (ans == NULL) {
goto fail;
}
dans = (double *)PyArray_DATA(ans);
NPY_BEGIN_ALLOW_THREADS;
for (i = 0; i < len; ++i) {
k = i * len;
for (j = i; j < i + len; ++j, ++k) {
dans[j] += numbers[k];
}
}
NPY_END_ALLOW_THREADS;
Py_DECREF(list_arr);
return (PyObject *)ans;
fail:
Py_XDECREF(list_arr);
Py_XDECREF(ans);
return NULL;
}
for 循环是最重要的:
for (i = 0; i < len; ++i) {
k = i * len;
for (j = i; j < i + len; ++j, ++k) {
dans[j] += numbers[k];
}
}
numbers
是输入参数 (A
),我们以跨步方式访问 numbers
和 dans
中的元素。在我的 3x3 示例中,我们有以下 j
和 k
值:
j = [0, 1, 2, 1, 2, 3, 2, 3, 4]
k = [0, 1, 2, 3, 4, 5, 6, 7, 8]
NPY_BEGIN_ALLOW_THREADS
是我经常看到的用于其他 numpy 函数的东西,但是当我不使用它进行测试时似乎没有性能差异。
性能与just_sum_0
相似