为什么 numpy 笛卡尔积比纯 python 版本慢?

Why is numpy cartesian product slower than pure python version?

输入

import numpy as np
import itertools

a = np.array([ 1,  6,  7,  8, 10, 11, 13, 14, 15, 19, 20, 23, 24, 26, 28, 29, 33,
       34, 41, 42, 43, 44, 45, 46, 47, 52, 54, 58, 60, 61, 65, 70, 75]).astype(np.uint8)
b = np.array([ 2,  3,  4, 10, 12, 14, 16, 20, 22, 26, 28, 29, 30, 31, 34, 36, 37,
       38, 39, 40, 41, 46, 48, 49, 50, 52, 53, 55, 56, 57, 59, 60, 63, 66,
       67, 68, 69, 70, 71, 74]).astype(np.uint8)
c = np.array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
       34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
       51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
       68, 69, 70, 71, 72, 73, 74, 75]).astype(np.uint8)

我想得到 3 个数组的笛卡尔积,但我不希望一行中有任何重复元素 [1, 2, 1] 将无效,并且这两个中只有一个有效 [10, 14, 0][14, 10, 0] 因为 10 和 14 都在 ab.

Python 仅

def no_numpy():
    combos = {tuple(set(i)): i for i in itertools.product(a, b, c)}
    combos = [val for key, val in combos.items() if len(key) == 3]
%timeit no_numpy() # 32.5 ms ± 508 µs per loop

Numpy

# Solution from (
def cartesian_product(*arrays):
    broadcastable = np.ix_(*arrays)
    broadcasted = np.broadcast_arrays(*broadcastable)
    rows, cols = np.prod(broadcasted[0].shape), len(broadcasted)
    dtype = np.result_type(*arrays)

    out = np.empty(rows * cols, dtype=dtype)
    start, end = 0, rows
    for a in broadcasted:
        out[start:end] = a.reshape(-1)
        start, end = end, end + rows
    return out.reshape(cols, rows).T

def numpy():
    combos = {tuple(set(i)): i for i in cartesian_product(*[a, b, c])}
    combos = [val for key, val in combos.items() if len(key) == 3]
%timeit numpy() # 96.2 ms ± 136 µs per loop

我的猜测是在 numpy 版本中将 np.array 转换为集合是它慢得多的原因,但是当严格比较初始产品时 cartesian_productitertools.product 快得多.

是否可以修改 numpy 版本以胜过纯 python 解决方案,或者是否有其他解决方案可以胜过两者?

要让 numpy 的运行速度与过滤后的 python 迭代器一样快将非常困难,因为 numpy 处理的整个结构将不可避免地大于过滤集的结果。

这是我能想到的最好的处理数组乘积的方法,即根据不同值的唯一组合过滤结果:

def npProductSets(a,b,*others):
    if len(a.shape)<2 : a = a[:,None]
    if len(b.shape)<2 : b = b[:,None]
    left  = np.repeat(a,b.shape[0],axis=0)
    right = np.tile(b,(a.shape[0],1))
    distinct = ~np.any(right==left,axis=1)
    prod  = np.concatenate((left[distinct],right[distinct]),axis=1)
    prod.sort(axis=1)
    prod  = np.unique(prod,axis=0) 
    if others:
        return npProductSets(prod,*others)
    return prod

这个 npProductSets 函数会过滤展开的数组,并使用 numpy 方法进行过滤。它仍然比 Python 生成器运行得慢(0.078 秒对 0.054 秒)。 Numpy 不是组合学和集合操作的理想工具。

请注意,npProductSets returns 50014 项而不是您的 58363,因为 tuple(set(i)) 不会过滤数字的所有排列。将集合转换为元组并不能保证元素的顺序(因此由于置换项,重复组合包含在输出中)。

你可以这样做:

# create full Cartessian product and keep items in sorted form
arr = np.stack(np.meshgrid(a, b, c), axis=-1).reshape(-1, 3)
arr_sort = np.sort(arr, axis=1)

# apply condition 1: no duplicates between sorted items
u, idx_c1 = np.unique(arr_sort, return_index=True, axis=0)
arr_filter, arr_sort_filter = arr[idx_c1], arr_sort[idx_c1]

# apply condition 2: no items with repeated values between sorted items
idx_c2 = (arr_sort_filter[:,0] != arr_sort_filter[:,1]) & \
           (arr_sort_filter[:,1] != arr_sort_filter[:,2])

arr_filter[idx_c2]
>>>
array([[ 1,  2,  0],
       [ 1,  3,  0],
       [ 1,  4,  0],
                ...,
       [75, 71, 74],
       [75, 74, 72],
       [75, 74, 73]], dtype=uint8)

我的计算机需要 57 毫秒,而 no_numpy(args?) 和 returns 50014 项需要 77 毫秒。

您稍后可以分析此算法以查看可以优化的内容。我手动完成,但找到一些分析工具是个好主意:)

  • arr ~0.2 毫秒
  • arr_sort~1.4ms
  • u, idx_c1 ~ 52ms
  • 剩余部分~2.5ms

所以很容易看到这里一直在消耗什么。使用降维可以显着改善它。其中一种方法是替换

u, idx_c1 = np.unique(arr_sort, return_index=True, axis=0)

M = max(np.max(a), np.max(b), np.max(c))
idx = np.ravel_multi_index(arr_sort.T, (M+1, M+1, M+1))
u, idx_c1 = np.unique(idx, return_index=True) 

现在只运行了4.5毫秒,总共才运行了9毫秒!我想如果你优化了这些部分,你可以将这个算法加速 ~3 倍:

  • 使用 numbaidx_c2
  • 中进行更快的比较
  • 使用 numba 加速 np.ravel_multi_index(即使在 numpy 中手动实现也能更快)
  • 使用 numbanumpy 版本的 np.bincount 而不是 np.unique

为什么当前的实施速度很慢

虽然第一个解决方案比第二个解决方案快,但效率很低,因为它创建了大量临时 CPython 对象(每个 itertools.product 项目至少 6 个)。 创建大量对象是昂贵的,因为它们是动态分配的,并且 reference-counted 由 CPython。 Numpy 函数 cartesian_product 非常快,但是对结果数组的迭代非常慢,因为它创建了很多 Numpy 视图并在 numpy.uint8 而不是 CPython int 上运行. Numpy 类型和函数为非常小的数组引入了巨大的开销

如@AlainT 所示,Numpy 可用于加速此操作,但这并非易事,而且 Numpy 无法解决此类问题。


如何提高性能

一种解决方案是使用 Numba 自己更有效地完成工作,并让 Numba 的 JIT 编译器 优化循环。您可以使用 3 个嵌套循环来高效地生成笛卡尔积和筛选项的值。字典可用于跟踪已经看到的值。 3 项的元组可以 打包 为一个整数,以减少内存占用并提高性能(因此字典可以更好地适应 CPU 缓存并避免创建缓慢的元组对象)。

这是生成的代码:

import numba as nb

# Signature of the function (parameter types)
# Note: `::1` means the array is contiguous
@nb.njit('(uint8[::1], uint8[::1], uint8[::1])')
def with_numba(a, b, c):
    seen = dict()

    for va in a:
        for vb in b:
            for vc in c:
                # If the 3 values are different
                if va != vb and vc != vb and vc != va:
                    # Sort the 3 values using a fast sorting network
                    v1, v2, v3 = va, vb, vc
                    if v1 > v2: v1, v2 = v2, v1
                    if v2 > v3: v2, v3 = v3, v2
                    if v1 > v2: v1, v2 = v2, v1

                    # Compact the 3 values into one 32-bit integer
                    packedKey = (np.uint32(v1) << 16) | (np.uint32(v2) << 8) | np.uint32(v3)

                    # Is the sorted tuple (v1,v2,v3) already seen?
                    if packedKey not in seen:
                        # Add the value and remember the ordered tuple (va,vb,vc)
                        packedValue = (np.uint32(va) << 16) | (np.uint32(vb) << 8) | np.uint32(vc)
                        seen[packedKey] = packedValue

    res = np.empty((len(seen), 3), dtype=np.uint8)

    for i, packed in enumerate(seen.values()):
        res[i, 0] = np.uint8(packed >> 16)
        res[i, 1] = np.uint8(packed >> 8)
        res[i, 2] = np.uint8(packed)

    return res

with_numba(a, b, c)

基准

以下是我的 i5-9600KF 处理器的结果:

numpy:                         122.1 ms  (x 1.0)
no_numpy:                       49.6 ms  (x 2.5)
AlainT's solution:              49.0 ms  (x 2.5)
mathfux's solution              34.2 ms  (x 3.5)
mathfux's optimized solution     7.5 ms  (x16.2)
with_numba:                      4.9 ms  (x24.9)

所提供的解决方案比最慢的实施方案快 25 倍,比迄今为止最快的实施方案快 1.5 倍

当前的 Numba 代码受 Numba 字典操作速度的限制。可以使用更多 low-level 技巧优化代码。解决方案是用大小为 256**3/8 打包布尔数组 (1 项 = 1 位)替换字典,以跟踪已经看到的值(通过检查 packedKey 位)。如果未设置提取位,则可以直接将打包值添加到 res 中。这需要将 res 预分配到最大大小或实现指数增长的数组(如 Python 中的 list 或 C++ 中的 std::vector)。另一个优化是对列表进行排序并使用 tiling 策略来改善 cache locality。这样的优化远不容易实现,但我希望它们能大大加快执行速度。

如果您打算使用更多数组,那么 hash-map 可能会成为瓶颈,而 bit-array 可能会很大。虽然使用平铺确实有助于减少内存占用,但您可以使用 Bloom filters 大大加快实施速度。这种概率数据结构可以通过跳过许多重复项来加快执行速度,而不会导致任何 缓存未命中 并且内存占用很小。您可以删除大部分重复项,然后对数组进行排序,以便删除重复项。关于您的问题,基数排序可能比通常的排序算法更快。