如何向量化这个二维矩阵运算?

How to Vectorize This 2D Matrix Operation?

我有 2 个布尔掩码数组,我希望计算两个掩码的每个组合的操作。

The slow version

N = 10000
M = 580
masksA = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)
masksB = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)

result = np.zeros(shape=(N,N), dtype=np.float)
for i in range(N):
    for j in range(N):
        result[i,j] = np.float64(np.count_nonzero(masksB[i,:]==masksB[j,:])) / M

The faster version

for i in range(N):
    result[i,:] = np.array(np.count_nonzero(masksB[i]==masksA, axis=1), dtype=np.float64) / M

可以用一个循环更快地完成吗?

这基本上是外部相等比较,保持第一个掩码轴对齐。我们可以利用 matrix-multiplication 的想法,即掩码本身及其同时取反版本的矩阵乘法将引导我们得出那些外部相等求和。因此,对于等式的求和,只需执行 -

out = (~masksB).astype(int).dot(~masksA.T) + masksB.astype(int).dot(masksA.T)

或者,获取 int 版本并使用它来获取否定版本 -

mB = masksB.astype(int)
out = (1-mB).dot(~masksA.T) + mB.dot(masksA.T)

最终输出将是 out/float(M)。或者我们可以用 float 替换那些 int 转换,然后将输出除以 M

获得等式求和的时间(N 为 1000)

# Setup
In [39]: np.random.seed(0)
    ...: N = 1000
    ...: M = 580
    ...: masksA = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)
    ...: masksB = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)

# Approach #1 with int dtype converison
In [40]: %timeit (~masksB).astype(int).dot(~masksA.T) + masksB.astype(int).dot(masksA.T)
1 loop, best of 3: 870 ms per loop

# Approach #1 with float dtype converison
In [41]: %timeit (~masksB).astype(float).dot(~masksA.T) + masksB.astype(float).dot(masksA.T)
10 loops, best of 3: 74 ms per loop

# Approach #2 with int dtype converison
In [42]: %%timeit
    ...: mB = masksB.astype(int)
    ...: out = (1-mB).dot(~masksA.T) + mB.dot(masksA.T)
1 loop, best of 3: 882 ms per loop

# Approach #2 with float dtype converison
In [43]: %%timeit
    ...: mB = masksB.astype(float)
    ...: out = (1-mB).dot(~masksA.T) + mB.dot(masksA.T)
10 loops, best of 3: 59.3 ms per loop

因此,首选方法是在第二种方法中进行浮点数转换,并将输出除以 M

使用一个带堆叠的矩阵乘法

由于等式求和本质上是 int 数字,我们可以使用较低精度的 dtype 进行矩阵乘法。此外,还有一个想法是将原始掩码与其取反版本叠加,然后执行矩阵乘法。因此,对于相等求和,我们将有 -

m1 = np.hstack((masksA,~masksA))
m2 = np.hstack((masksB,~masksB))
out = m2.astype(np.float32).dot(m1.T)

计时(与之前的设置相同)-

In [49]: %%timeit
    ...: m1 = np.hstack((masksA,~masksA))
    ...: m2 = np.hstack((masksB,~masksB))
    ...: out = m2.astype(np.float32).dot(m1.T)
10 loops, best of 3: 36.8 ms per loop

我们可以通过仅使用一个矩阵乘法有效地将@Divakar 的(当前)最佳时间一分为二:

import numpy as np

N = 1000
M = 580
masksA = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)
masksB = np.array(np.random.randint(0,2, size=(N,M)), dtype=np.bool)

def f_pp():
    return np.where(masksB,1.0,-1.0)@np.where(masksA,0.5/M,-0.5/M).T+0.5

def f_pp_2():
    return (np.where(masksB,1.0,-1.0)@np.where(masksA,0.5,-0.5).T+0.5*M)*(1/M)

def f_dv_2():
    mB = masksB.astype(float)
    return ((1-mB).dot(~masksA.T) + mB.dot(masksA.T))*(1/M)

assert np.allclose(f_pp(),f_dv_2())
assert np.all(f_pp_2()==f_dv_2())

from timeit import timeit

print("PP     ",timeit(f_pp,number=100)*10,"ms")
print("PP 2   ",timeit(f_pp_2,number=100)*10,"ms")
print("Divakar",timeit(f_dv_2,number=100)*10,"ms")

样本运行:

PP      31.41063162125647 ms
PP 2    31.757128546014428 ms
Divakar 63.400797033682466 ms

它是如何工作的?通过将 True 和 False 映射到相反的数字 x-x(不同的 x 可以用于这两个因素;假设 a 用于 masksA 和 b对于 masksB),我们在点积中为掩码一致的每个位置获得一个项 ab,并为它们不同的每个位置获得一个项 -ab。如果 k 是一对向量之间相等的位数,那么它们的点积将为 kab-(M-k)ab = 2kab - Mab。选择 ab 这样 2Mab = 1 就变成了 k/M - 1/2 这就是我们想要的结果的常数偏移量。