为什么 numpy 笛卡尔积比纯 python 版本慢?
Why is numpy cartesian product slower than pure python version?
输入
import numpy as np
import itertools
a = np.array([ 1, 6, 7, 8, 10, 11, 13, 14, 15, 19, 20, 23, 24, 26, 28, 29, 33,
34, 41, 42, 43, 44, 45, 46, 47, 52, 54, 58, 60, 61, 65, 70, 75]).astype(np.uint8)
b = np.array([ 2, 3, 4, 10, 12, 14, 16, 20, 22, 26, 28, 29, 30, 31, 34, 36, 37,
38, 39, 40, 41, 46, 48, 49, 50, 52, 53, 55, 56, 57, 59, 60, 63, 66,
67, 68, 69, 70, 71, 74]).astype(np.uint8)
c = np.array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
68, 69, 70, 71, 72, 73, 74, 75]).astype(np.uint8)
我想得到 3 个数组的笛卡尔积,但我不希望一行中有任何重复元素 [1, 2, 1]
将无效,并且这两个中只有一个有效 [10, 14, 0]
或 [14, 10, 0]
因为 10 和 14 都在 a
和 b
.
中
Python 仅
def no_numpy():
combos = {tuple(set(i)): i for i in itertools.product(a, b, c)}
combos = [val for key, val in combos.items() if len(key) == 3]
%timeit no_numpy() # 32.5 ms ± 508 µs per loop
Numpy
# Solution from (
def cartesian_product(*arrays):
broadcastable = np.ix_(*arrays)
broadcasted = np.broadcast_arrays(*broadcastable)
rows, cols = np.prod(broadcasted[0].shape), len(broadcasted)
dtype = np.result_type(*arrays)
out = np.empty(rows * cols, dtype=dtype)
start, end = 0, rows
for a in broadcasted:
out[start:end] = a.reshape(-1)
start, end = end, end + rows
return out.reshape(cols, rows).T
def numpy():
combos = {tuple(set(i)): i for i in cartesian_product(*[a, b, c])}
combos = [val for key, val in combos.items() if len(key) == 3]
%timeit numpy() # 96.2 ms ± 136 µs per loop
我的猜测是在 numpy 版本中将 np.array
转换为集合是它慢得多的原因,但是当严格比较初始产品时 cartesian_product
比 itertools.product
快得多.
是否可以修改 numpy 版本以胜过纯 python 解决方案,或者是否有其他解决方案可以胜过两者?
要让 numpy 的运行速度与过滤后的 python 迭代器一样快将非常困难,因为 numpy 处理的整个结构将不可避免地大于过滤集的结果。
这是我能想到的最好的处理数组乘积的方法,即根据不同值的唯一组合过滤结果:
def npProductSets(a,b,*others):
if len(a.shape)<2 : a = a[:,None]
if len(b.shape)<2 : b = b[:,None]
left = np.repeat(a,b.shape[0],axis=0)
right = np.tile(b,(a.shape[0],1))
distinct = ~np.any(right==left,axis=1)
prod = np.concatenate((left[distinct],right[distinct]),axis=1)
prod.sort(axis=1)
prod = np.unique(prod,axis=0)
if others:
return npProductSets(prod,*others)
return prod
这个 npProductSets 函数会过滤展开的数组,并使用 numpy 方法进行过滤。它仍然比 Python 生成器运行得慢(0.078 秒对 0.054 秒)。 Numpy 不是组合学和集合操作的理想工具。
请注意,npProductSets returns 50014 项而不是您的 58363,因为 tuple(set(i))
不会过滤数字的所有排列。将集合转换为元组并不能保证元素的顺序(因此由于置换项,重复组合包含在输出中)。
你可以这样做:
# create full Cartessian product and keep items in sorted form
arr = np.stack(np.meshgrid(a, b, c), axis=-1).reshape(-1, 3)
arr_sort = np.sort(arr, axis=1)
# apply condition 1: no duplicates between sorted items
u, idx_c1 = np.unique(arr_sort, return_index=True, axis=0)
arr_filter, arr_sort_filter = arr[idx_c1], arr_sort[idx_c1]
# apply condition 2: no items with repeated values between sorted items
idx_c2 = (arr_sort_filter[:,0] != arr_sort_filter[:,1]) & \
(arr_sort_filter[:,1] != arr_sort_filter[:,2])
arr_filter[idx_c2]
>>>
array([[ 1, 2, 0],
[ 1, 3, 0],
[ 1, 4, 0],
...,
[75, 71, 74],
[75, 74, 72],
[75, 74, 73]], dtype=uint8)
我的计算机需要 57 毫秒,而 no_numpy(args?)
和 returns 50014 项需要 77 毫秒。
您稍后可以分析此算法以查看可以优化的内容。我手动完成,但找到一些分析工具是个好主意:)
- arr ~0.2 毫秒
- arr_sort~1.4ms
- u, idx_c1 ~ 52ms
- 剩余部分~2.5ms
所以很容易看到这里一直在消耗什么。使用降维可以显着改善它。其中一种方法是替换
u, idx_c1 = np.unique(arr_sort, return_index=True, axis=0)
与
M = max(np.max(a), np.max(b), np.max(c))
idx = np.ravel_multi_index(arr_sort.T, (M+1, M+1, M+1))
u, idx_c1 = np.unique(idx, return_index=True)
现在只运行了4.5毫秒,总共才运行了9毫秒!我想如果你优化了这些部分,你可以将这个算法加速 ~3 倍:
- 使用
numba
在 idx_c2
中进行更快的比较
- 使用
numba
加速 np.ravel_multi_index
(即使在 numpy
中手动实现也能更快)
- 使用
numba
或 numpy
版本的 np.bincount
而不是 np.unique
为什么当前的实施速度很慢
虽然第一个解决方案比第二个解决方案快,但效率很低,因为它创建了大量临时 CPython 对象(每个 itertools.product
项目至少 6 个)。 创建大量对象是昂贵的,因为它们是动态分配的,并且 reference-counted 由 CPython。 Numpy 函数 cartesian_product
非常快,但是对结果数组的迭代非常慢,因为它创建了很多 Numpy 视图并在 numpy.uint8
而不是 CPython int
上运行. Numpy 类型和函数为非常小的数组引入了巨大的开销。
如@AlainT 所示,Numpy 可用于加速此操作,但这并非易事,而且 Numpy 无法解决此类问题。
如何提高性能
一种解决方案是使用 Numba 自己更有效地完成工作,并让 Numba 的 JIT 编译器 优化循环。您可以使用 3 个嵌套循环来高效地生成笛卡尔积和筛选项的值。字典可用于跟踪已经看到的值。 3 项的元组可以 打包 为一个整数,以减少内存占用并提高性能(因此字典可以更好地适应 CPU 缓存并避免创建缓慢的元组对象)。
这是生成的代码:
import numba as nb
# Signature of the function (parameter types)
# Note: `::1` means the array is contiguous
@nb.njit('(uint8[::1], uint8[::1], uint8[::1])')
def with_numba(a, b, c):
seen = dict()
for va in a:
for vb in b:
for vc in c:
# If the 3 values are different
if va != vb and vc != vb and vc != va:
# Sort the 3 values using a fast sorting network
v1, v2, v3 = va, vb, vc
if v1 > v2: v1, v2 = v2, v1
if v2 > v3: v2, v3 = v3, v2
if v1 > v2: v1, v2 = v2, v1
# Compact the 3 values into one 32-bit integer
packedKey = (np.uint32(v1) << 16) | (np.uint32(v2) << 8) | np.uint32(v3)
# Is the sorted tuple (v1,v2,v3) already seen?
if packedKey not in seen:
# Add the value and remember the ordered tuple (va,vb,vc)
packedValue = (np.uint32(va) << 16) | (np.uint32(vb) << 8) | np.uint32(vc)
seen[packedKey] = packedValue
res = np.empty((len(seen), 3), dtype=np.uint8)
for i, packed in enumerate(seen.values()):
res[i, 0] = np.uint8(packed >> 16)
res[i, 1] = np.uint8(packed >> 8)
res[i, 2] = np.uint8(packed)
return res
with_numba(a, b, c)
基准
以下是我的 i5-9600KF 处理器的结果:
numpy: 122.1 ms (x 1.0)
no_numpy: 49.6 ms (x 2.5)
AlainT's solution: 49.0 ms (x 2.5)
mathfux's solution 34.2 ms (x 3.5)
mathfux's optimized solution 7.5 ms (x16.2)
with_numba: 4.9 ms (x24.9)
所提供的解决方案比最慢的实施方案快 25 倍,比迄今为止最快的实施方案快 1.5 倍。
当前的 Numba 代码受 Numba 字典操作速度的限制。可以使用更多 low-level 技巧优化代码。解决方案是用大小为 256**3/8
的 打包布尔数组 (1 项 = 1 位)替换字典,以跟踪已经看到的值(通过检查 packedKey
位)。如果未设置提取位,则可以直接将打包值添加到 res
中。这需要将 res
预分配到最大大小或实现指数增长的数组(如 Python 中的 list
或 C++ 中的 std::vector
)。另一个优化是对列表进行排序并使用 tiling 策略来改善 cache locality。这样的优化远不容易实现,但我希望它们能大大加快执行速度。
如果您打算使用更多数组,那么 hash-map 可能会成为瓶颈,而 bit-array 可能会很大。虽然使用平铺确实有助于减少内存占用,但您可以使用 Bloom filters 大大加快实施速度。这种概率数据结构可以通过跳过许多重复项来加快执行速度,而不会导致任何 缓存未命中 并且内存占用很小。您可以删除大部分重复项,然后对数组进行排序,以便删除重复项。关于您的问题,基数排序可能比通常的排序算法更快。
输入
import numpy as np
import itertools
a = np.array([ 1, 6, 7, 8, 10, 11, 13, 14, 15, 19, 20, 23, 24, 26, 28, 29, 33,
34, 41, 42, 43, 44, 45, 46, 47, 52, 54, 58, 60, 61, 65, 70, 75]).astype(np.uint8)
b = np.array([ 2, 3, 4, 10, 12, 14, 16, 20, 22, 26, 28, 29, 30, 31, 34, 36, 37,
38, 39, 40, 41, 46, 48, 49, 50, 52, 53, 55, 56, 57, 59, 60, 63, 66,
67, 68, 69, 70, 71, 74]).astype(np.uint8)
c = np.array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
68, 69, 70, 71, 72, 73, 74, 75]).astype(np.uint8)
我想得到 3 个数组的笛卡尔积,但我不希望一行中有任何重复元素 [1, 2, 1]
将无效,并且这两个中只有一个有效 [10, 14, 0]
或 [14, 10, 0]
因为 10 和 14 都在 a
和 b
.
Python 仅
def no_numpy():
combos = {tuple(set(i)): i for i in itertools.product(a, b, c)}
combos = [val for key, val in combos.items() if len(key) == 3]
%timeit no_numpy() # 32.5 ms ± 508 µs per loop
Numpy
# Solution from (
def cartesian_product(*arrays):
broadcastable = np.ix_(*arrays)
broadcasted = np.broadcast_arrays(*broadcastable)
rows, cols = np.prod(broadcasted[0].shape), len(broadcasted)
dtype = np.result_type(*arrays)
out = np.empty(rows * cols, dtype=dtype)
start, end = 0, rows
for a in broadcasted:
out[start:end] = a.reshape(-1)
start, end = end, end + rows
return out.reshape(cols, rows).T
def numpy():
combos = {tuple(set(i)): i for i in cartesian_product(*[a, b, c])}
combos = [val for key, val in combos.items() if len(key) == 3]
%timeit numpy() # 96.2 ms ± 136 µs per loop
我的猜测是在 numpy 版本中将 np.array
转换为集合是它慢得多的原因,但是当严格比较初始产品时 cartesian_product
比 itertools.product
快得多.
是否可以修改 numpy 版本以胜过纯 python 解决方案,或者是否有其他解决方案可以胜过两者?
要让 numpy 的运行速度与过滤后的 python 迭代器一样快将非常困难,因为 numpy 处理的整个结构将不可避免地大于过滤集的结果。
这是我能想到的最好的处理数组乘积的方法,即根据不同值的唯一组合过滤结果:
def npProductSets(a,b,*others):
if len(a.shape)<2 : a = a[:,None]
if len(b.shape)<2 : b = b[:,None]
left = np.repeat(a,b.shape[0],axis=0)
right = np.tile(b,(a.shape[0],1))
distinct = ~np.any(right==left,axis=1)
prod = np.concatenate((left[distinct],right[distinct]),axis=1)
prod.sort(axis=1)
prod = np.unique(prod,axis=0)
if others:
return npProductSets(prod,*others)
return prod
这个 npProductSets 函数会过滤展开的数组,并使用 numpy 方法进行过滤。它仍然比 Python 生成器运行得慢(0.078 秒对 0.054 秒)。 Numpy 不是组合学和集合操作的理想工具。
请注意,npProductSets returns 50014 项而不是您的 58363,因为 tuple(set(i))
不会过滤数字的所有排列。将集合转换为元组并不能保证元素的顺序(因此由于置换项,重复组合包含在输出中)。
你可以这样做:
# create full Cartessian product and keep items in sorted form
arr = np.stack(np.meshgrid(a, b, c), axis=-1).reshape(-1, 3)
arr_sort = np.sort(arr, axis=1)
# apply condition 1: no duplicates between sorted items
u, idx_c1 = np.unique(arr_sort, return_index=True, axis=0)
arr_filter, arr_sort_filter = arr[idx_c1], arr_sort[idx_c1]
# apply condition 2: no items with repeated values between sorted items
idx_c2 = (arr_sort_filter[:,0] != arr_sort_filter[:,1]) & \
(arr_sort_filter[:,1] != arr_sort_filter[:,2])
arr_filter[idx_c2]
>>>
array([[ 1, 2, 0],
[ 1, 3, 0],
[ 1, 4, 0],
...,
[75, 71, 74],
[75, 74, 72],
[75, 74, 73]], dtype=uint8)
我的计算机需要 57 毫秒,而 no_numpy(args?)
和 returns 50014 项需要 77 毫秒。
您稍后可以分析此算法以查看可以优化的内容。我手动完成,但找到一些分析工具是个好主意:)
- arr ~0.2 毫秒
- arr_sort~1.4ms
- u, idx_c1 ~ 52ms
- 剩余部分~2.5ms
所以很容易看到这里一直在消耗什么。使用降维可以显着改善它。其中一种方法是替换
u, idx_c1 = np.unique(arr_sort, return_index=True, axis=0)
与
M = max(np.max(a), np.max(b), np.max(c))
idx = np.ravel_multi_index(arr_sort.T, (M+1, M+1, M+1))
u, idx_c1 = np.unique(idx, return_index=True)
现在只运行了4.5毫秒,总共才运行了9毫秒!我想如果你优化了这些部分,你可以将这个算法加速 ~3 倍:
- 使用
numba
在idx_c2
中进行更快的比较
- 使用
numba
加速np.ravel_multi_index
(即使在numpy
中手动实现也能更快) - 使用
numba
或numpy
版本的np.bincount
而不是np.unique
为什么当前的实施速度很慢
虽然第一个解决方案比第二个解决方案快,但效率很低,因为它创建了大量临时 CPython 对象(每个 itertools.product
项目至少 6 个)。 创建大量对象是昂贵的,因为它们是动态分配的,并且 reference-counted 由 CPython。 Numpy 函数 cartesian_product
非常快,但是对结果数组的迭代非常慢,因为它创建了很多 Numpy 视图并在 numpy.uint8
而不是 CPython int
上运行. Numpy 类型和函数为非常小的数组引入了巨大的开销。
如@AlainT 所示,Numpy 可用于加速此操作,但这并非易事,而且 Numpy 无法解决此类问题。
如何提高性能
一种解决方案是使用 Numba 自己更有效地完成工作,并让 Numba 的 JIT 编译器 优化循环。您可以使用 3 个嵌套循环来高效地生成笛卡尔积和筛选项的值。字典可用于跟踪已经看到的值。 3 项的元组可以 打包 为一个整数,以减少内存占用并提高性能(因此字典可以更好地适应 CPU 缓存并避免创建缓慢的元组对象)。
这是生成的代码:
import numba as nb
# Signature of the function (parameter types)
# Note: `::1` means the array is contiguous
@nb.njit('(uint8[::1], uint8[::1], uint8[::1])')
def with_numba(a, b, c):
seen = dict()
for va in a:
for vb in b:
for vc in c:
# If the 3 values are different
if va != vb and vc != vb and vc != va:
# Sort the 3 values using a fast sorting network
v1, v2, v3 = va, vb, vc
if v1 > v2: v1, v2 = v2, v1
if v2 > v3: v2, v3 = v3, v2
if v1 > v2: v1, v2 = v2, v1
# Compact the 3 values into one 32-bit integer
packedKey = (np.uint32(v1) << 16) | (np.uint32(v2) << 8) | np.uint32(v3)
# Is the sorted tuple (v1,v2,v3) already seen?
if packedKey not in seen:
# Add the value and remember the ordered tuple (va,vb,vc)
packedValue = (np.uint32(va) << 16) | (np.uint32(vb) << 8) | np.uint32(vc)
seen[packedKey] = packedValue
res = np.empty((len(seen), 3), dtype=np.uint8)
for i, packed in enumerate(seen.values()):
res[i, 0] = np.uint8(packed >> 16)
res[i, 1] = np.uint8(packed >> 8)
res[i, 2] = np.uint8(packed)
return res
with_numba(a, b, c)
基准
以下是我的 i5-9600KF 处理器的结果:
numpy: 122.1 ms (x 1.0)
no_numpy: 49.6 ms (x 2.5)
AlainT's solution: 49.0 ms (x 2.5)
mathfux's solution 34.2 ms (x 3.5)
mathfux's optimized solution 7.5 ms (x16.2)
with_numba: 4.9 ms (x24.9)
所提供的解决方案比最慢的实施方案快 25 倍,比迄今为止最快的实施方案快 1.5 倍。
当前的 Numba 代码受 Numba 字典操作速度的限制。可以使用更多 low-level 技巧优化代码。解决方案是用大小为 256**3/8
的 打包布尔数组 (1 项 = 1 位)替换字典,以跟踪已经看到的值(通过检查 packedKey
位)。如果未设置提取位,则可以直接将打包值添加到 res
中。这需要将 res
预分配到最大大小或实现指数增长的数组(如 Python 中的 list
或 C++ 中的 std::vector
)。另一个优化是对列表进行排序并使用 tiling 策略来改善 cache locality。这样的优化远不容易实现,但我希望它们能大大加快执行速度。
如果您打算使用更多数组,那么 hash-map 可能会成为瓶颈,而 bit-array 可能会很大。虽然使用平铺确实有助于减少内存占用,但您可以使用 Bloom filters 大大加快实施速度。这种概率数据结构可以通过跳过许多重复项来加快执行速度,而不会导致任何 缓存未命中 并且内存占用很小。您可以删除大部分重复项,然后对数组进行排序,以便删除重复项。关于您的问题,基数排序可能比通常的排序算法更快。