用于比较两个 NumPy 数组的逐元素广播?
Element-wise broadcasting for comparing two NumPy arrays?
假设我有一个这样的数组:
import numpy as np
base_array = np.array([-13, -9, -11, -3, -3, -4, 2, 2,
2, 5, 7, 7, 8, 7, 12, 11])
假设我想知道:"how many elements in base_array
are greater than 4?"这可以简单地通过利用广播来完成:
np.sum(4 < base_array)
答案是7
。现在,假设我不想与单个值进行比较,而是想对数组进行比较。换句话说,对于 comparison_array
中的每个值 c
,找出 base_array
中有多少元素大于 c
。如果我以天真的方式这样做,它显然会失败,因为它不知道如何正确地广播它:
comparison_array = np.arange(-13, 13)
comparison_result = np.sum(comparison_array < base_array)
输出:
Traceback (most recent call last):
File "<pyshell#87>", line 1, in <module>
np.sum(comparison_array < base_array)
ValueError: operands could not be broadcast together with shapes (26,) (16,)
如果我能以某种方式让 comparison_array
的每个元素都广播到 base_array
的形状,那将解决这个问题。但是我不知道如何做这样的 "element-wise broadcasting".
现在,我知道如何使用列表理解为这两种情况实现它:
first = sum([4 < i for i in base_array])
second = [sum([c < i for i in base_array])
for c in comparison_array]
print(first)
print(second)
输出:
7
[15, 15, 14, 14, 13, 13, 13, 13, 13, 12, 10, 10, 10, 10, 10, 7, 7, 7, 6, 6, 3, 2, 2, 2, 1, 0]
但众所周知,这将比在较大数组上正确矢量化 numpy
实施慢几个数量级。那么,我应该如何在 numpy
中执行此操作才能使其快速?理想情况下,此解决方案应扩展到广播有效的任何类型的操作,而不仅仅是本例中的大于或小于。
您可以简单地向比较数组添加一个维度,以便 "stretched" 比较沿新维度的所有值。
>>> np.sum(comparison_array[:, None] < base_array)
228
这是 broadcasting 的基本原则,适用于所有类型的操作。
如果需要沿轴求和,只需指定比较后要沿其求和的轴即可。
>>> np.sum(comparison_array[:, None] < base_array, axis=1)
array([15, 15, 14, 14, 13, 13, 13, 13, 13, 12, 10, 10, 10, 10, 10, 7, 7,
7, 6, 6, 3, 2, 2, 2, 1, 0])
您需要转置其中一个阵列以便广播正常工作。当您一起广播两个数组时,维度会排成一行,任何单位维度都会有效地扩展到它们匹配的非单位大小。因此两个大小为 (16, 1)
(原始数组)和 (1, 26)
(比较数组)的数组将广播到 (16, 26)
.
别忘了对大小为 16 的维度求和:
(base_array[:, None] > comparison_array).sum(axis=1)
切片中的 None
等同于 np.newaxis
:这是在指定索引处插入新单位维度的多种方法之一。您不需要这样做的原因 comparison_array[None, :]
是广播排列最高维度,并自动填充最低维度。
这里有一个 np.searchsorted
重点关注内存效率和性能 -
def get_comparative_sum(base_array, comparison_array):
n = len(base_array)
base_array_sorted = np.sort(base_array)
idx = np.searchsorted(base_array_sorted, comparison_array, 'right')
idx[idx==n] = n-1
return n - idx - (base_array_sorted[idx] == comparison_array)
计时 -
In [40]: np.random.seed(0)
...: base_array = np.random.randint(-1000,1000,(10000))
...: comparison_array = np.random.randint(-1000,1000,(20000))
# @miradulo's soln
In [41]: %timeit np.sum(comparison_array[:, None] < base_array, axis=1)
1 loop, best of 3: 386 ms per loop
In [42]: %timeit get_comparative_sum(base_array, comparison_array)
100 loops, best of 3: 2.36 ms per loop
假设我有一个这样的数组:
import numpy as np
base_array = np.array([-13, -9, -11, -3, -3, -4, 2, 2,
2, 5, 7, 7, 8, 7, 12, 11])
假设我想知道:"how many elements in base_array
are greater than 4?"这可以简单地通过利用广播来完成:
np.sum(4 < base_array)
答案是7
。现在,假设我不想与单个值进行比较,而是想对数组进行比较。换句话说,对于 comparison_array
中的每个值 c
,找出 base_array
中有多少元素大于 c
。如果我以天真的方式这样做,它显然会失败,因为它不知道如何正确地广播它:
comparison_array = np.arange(-13, 13)
comparison_result = np.sum(comparison_array < base_array)
输出:
Traceback (most recent call last):
File "<pyshell#87>", line 1, in <module>
np.sum(comparison_array < base_array)
ValueError: operands could not be broadcast together with shapes (26,) (16,)
如果我能以某种方式让 comparison_array
的每个元素都广播到 base_array
的形状,那将解决这个问题。但是我不知道如何做这样的 "element-wise broadcasting".
现在,我知道如何使用列表理解为这两种情况实现它:
first = sum([4 < i for i in base_array])
second = [sum([c < i for i in base_array])
for c in comparison_array]
print(first)
print(second)
输出:
7
[15, 15, 14, 14, 13, 13, 13, 13, 13, 12, 10, 10, 10, 10, 10, 7, 7, 7, 6, 6, 3, 2, 2, 2, 1, 0]
但众所周知,这将比在较大数组上正确矢量化 numpy
实施慢几个数量级。那么,我应该如何在 numpy
中执行此操作才能使其快速?理想情况下,此解决方案应扩展到广播有效的任何类型的操作,而不仅仅是本例中的大于或小于。
您可以简单地向比较数组添加一个维度,以便 "stretched" 比较沿新维度的所有值。
>>> np.sum(comparison_array[:, None] < base_array)
228
这是 broadcasting 的基本原则,适用于所有类型的操作。
如果需要沿轴求和,只需指定比较后要沿其求和的轴即可。
>>> np.sum(comparison_array[:, None] < base_array, axis=1)
array([15, 15, 14, 14, 13, 13, 13, 13, 13, 12, 10, 10, 10, 10, 10, 7, 7,
7, 6, 6, 3, 2, 2, 2, 1, 0])
您需要转置其中一个阵列以便广播正常工作。当您一起广播两个数组时,维度会排成一行,任何单位维度都会有效地扩展到它们匹配的非单位大小。因此两个大小为 (16, 1)
(原始数组)和 (1, 26)
(比较数组)的数组将广播到 (16, 26)
.
别忘了对大小为 16 的维度求和:
(base_array[:, None] > comparison_array).sum(axis=1)
切片中的 None
等同于 np.newaxis
:这是在指定索引处插入新单位维度的多种方法之一。您不需要这样做的原因 comparison_array[None, :]
是广播排列最高维度,并自动填充最低维度。
这里有一个 np.searchsorted
重点关注内存效率和性能 -
def get_comparative_sum(base_array, comparison_array):
n = len(base_array)
base_array_sorted = np.sort(base_array)
idx = np.searchsorted(base_array_sorted, comparison_array, 'right')
idx[idx==n] = n-1
return n - idx - (base_array_sorted[idx] == comparison_array)
计时 -
In [40]: np.random.seed(0)
...: base_array = np.random.randint(-1000,1000,(10000))
...: comparison_array = np.random.randint(-1000,1000,(20000))
# @miradulo's soln
In [41]: %timeit np.sum(comparison_array[:, None] < base_array, axis=1)
1 loop, best of 3: 386 ms per loop
In [42]: %timeit get_comparative_sum(base_array, comparison_array)
100 loops, best of 3: 2.36 ms per loop