使用 numpy 向量化 "pure" 函数,假设有很多重复项

Vectorizing a "pure" function with numpy, assuming many duplicates

我想将 "black box" Python 函数 f 应用于大型数组 arr。其他假设是:

我可以使用装饰器来实现这一点,该装饰器为 arr 的每个唯一元素计算 f,如下所示:

import numpy as np
from time import sleep
from functools import wraps


N = 1000
np.random.seed(0)
arr = np.random.randint(0, 10, size=(N, 2))


def vectorize_pure(f):
    @wraps(f)
    def f_vec(arr):
        uniques, ix = np.unique(arr, return_inverse=True)
        f_range = np.array([f(x) for x in uniques])
        return f_range[ix].reshape(arr.shape)
    return f_vec


@np.vectorize
def usual_vectorize(x):
    sleep(0.001)
    return x


@vectorize_pure
def pure_vectorize(x):
    sleep(0.001)
    return x

# In [47]: %timeit usual_vectorize(arr)                                
# 1.33 s ± 6.16 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# In [48]: %timeit pure_vectorize(arr)                                 
# 13.6 ms ± 81.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

我担心的是 np.unique 在后台对 arr 进行排序,考虑到假设,这似乎效率不高。我正在寻找一种实用的方法来实现类似的装饰器

  1. 利用快速 numpy 向量化操作。
  2. 不对输入数组进行排序。

我怀疑答案是 "yes" 使用 numba,但我对 numpy 解决方案特别感兴趣。

此外,根据 arr 数据类型,numpy 可能会使用基数排序,因此 unique 的性能在某些情况下可能会很好。


我在下面找到了一个解决方法,使用 pandas.unique;然而,它仍然需要两次遍历原始数组,并且 pandas.unique 做了一些额外的工作。我想知道 pandas._libs.hashtablecython 或其他任何东西是否存在更好的解决方案。

仔细研究后,这里有一种方法使用 pandas.unique(基于散列)而不是 numpy.unique(基于排序)。

import pandas as pd

def vectorize_with_pandas(f):
    @wraps(f)
    def f_vec(arr):
        uniques = np.sort(pd.unique(arr.ravel()))
        f_range = np.array([f(x) for x in uniques])
        return f_range[
            np.searchsorted(uniques, arr.ravel())
            ].reshape(arr.shape)
    return f_vec

提供以下性能提升:

N = 1_000_000
np.random.seed(0)
arr = np.random.randint(0, 10, size=(N, 2)).astype(float)

@vectorize_with_pandas
def pandas_vectorize(x):
    sleep(0.001)
    return x

In [33]: %timeit pure_vectorize(arr)                                         
152 ms ± 2.34 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [34]: %timeit pandas_vectorize(arr)                                       
76.8 ms ± 582 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

此外,根据 Warren Weckesser 的 ,如果 arr 是一个小整数数组,例如uint8。例如,

def unique_uint8(arr):
    q = np.zeros(256, dtype=int)
    q[arr.ravel()] = 1
    return np.nonzero(q)[0]


def vectorize_uint8(f):
    @wraps(f)
    def f_vec(arr):
        uniques = unique_uint8(arr)
        f_range = np.array([f(x) for x in uniques])
        return f_range[
            np.searchsorted(uniques, arr.ravel())
            ].reshape(arr.shape)
    return f_vec

下面的装饰器是:

  • 比您的 usual_vectorize
  • 快 10 倍
  • 比你的 vectorize_pure
  • 慢 10 倍
  • 没有进行任何排序(据我所知)
  • 使用numpy 向量化运算

代码:

def vectorize_pure2(f):
    @wraps(f)
    def f_vec(arr):
        tups = [tuple(x) for x in arr]
        tups_rows = dict(zip(tups, arr))
        new_arr = np.ndarray(arr.shape)
        for row in tups_rows.values():
            row_ixs = (arr == row).all(axis=1)
            new_arr[row_ixs] = f(row)
        return new_arr
    return f_vec

性能:

@vectorize_pure2
def pure_vectorize2(x):
    sleep(0.001)
    return x

In [49]: %timeit pure_vectorize2(arr)
135 ms ± 879 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

这个答案的一些功劳:

您实际上可以一次遍历数组来执行此操作,但是它要求您事先知道结果的 dtype。否则,您需要对元素进行第二次遍历以确定它。

暂时忽略性能(和 functools.wraps),实现可能如下所示:

def vectorize_cached(output_dtype):
    def vectorize_cached_factory(f):
        def f_vec(arr):
            flattened = arr.ravel()
            if output_dtype is None:
                result = np.empty_like(flattened)
            else:
                result = np.empty(arr.size, output_dtype)

            cache = {}
            for idx, item in enumerate(flattened):
                res = cache.get(item)
                if res is None:
                    res = f(item)
                    cache[item] = res
                result[idx] = res
            return result.reshape(arr.shape)
        return f_vec
    return vectorize_cached_factory

它首先创建结果数组,然后遍历输入数组。一旦遇到字典中不存在的元素,就会调用该函数(并存储结果)——否则它只会使用字典中存储的值。

@vectorize_cached(np.float64)
def t(x):
    print(x)
    return x + 2.5

>>> t(np.array([1,1,1,2,2,2,3,3,1,1,1]))
1
2
3
array([3.5, 3.5, 3.5, 4.5, 4.5, 4.5, 5.5, 5.5, 3.5, 3.5, 3.5])

但是这并不是特别快,因为我们正在对 NumPy 数组执行 Python 循环。

Cython 解决方案

为了让它更快,我们实际上可以将此实现移植到 Cython(目前仅支持 float32、float64、int32、int64、uint32 和 uint64,但扩展几乎微不足道,因为它使用融合类型):

%%cython

cimport numpy as cnp

ctypedef fused input_type:
    cnp.float32_t
    cnp.float64_t
    cnp.uint32_t
    cnp.uint64_t
    cnp.int32_t
    cnp.int64_t

ctypedef fused result_type:
    cnp.float32_t
    cnp.float64_t
    cnp.uint32_t
    cnp.uint64_t
    cnp.int32_t
    cnp.int64_t

cpdef void vectorized_cached_impl(input_type[:] array, result_type[:] result, object func):
    cdef dict cache = {}
    cdef Py_ssize_t idx
    cdef input_type item
    for idx in range(array.size):
        item = array[idx]
        res = cache.get(item)
        if res is None:
            res = func(item)
            cache[item] = res
        result[idx] = res

使用Python装饰器(以下代码未使用Cython编译):

def vectorize_cached_cython(output_dtype):
    def vectorize_cached_factory(f):
        def f_vec(arr):
            flattened = arr.ravel()
            if output_dtype is None:
                result = np.empty_like(flattened)
            else:
                result = np.empty(arr.size, output_dtype)

            vectorized_cached_impl(flattened, result, f)

            return result.reshape(arr.shape)
        return f_vec
    return vectorize_cached_factory

同样,这只执行一次,并且每个唯一值只应用一次函数:

@vectorize_cached_cython(np.float64)
def t(x):
    print(x)
    return x + 2.5

>>> t(np.array([1,1,1,2,2,2,3,3,1,1,1]))
1
2
3
array([3.5, 3.5, 3.5, 4.5, 4.5, 4.5, 5.5, 5.5, 3.5, 3.5, 3.5])

基准:快速函数,大量重复

但问题是:在这里使用 Cython 有意义吗?

我做了一个快速基准测试(没有 sleep)以了解性能有何不同(使用我的库 simple_benchmark):

def func_to_vectorize(x):
    return x

usual_vectorize = np.vectorize(func_to_vectorize)
pure_vectorize = vectorize_pure(func_to_vectorize)
pandas_vectorize = vectorize_with_pandas(func_to_vectorize)
cached_vectorize = vectorize_cached(None)(func_to_vectorize) 
cython_vectorize = vectorize_cached_cython(None)(func_to_vectorize) 


from simple_benchmark import BenchmarkBuilder

b = BenchmarkBuilder()
b.add_function(alias='usual_vectorize')(usual_vectorize)
b.add_function(alias='pure_vectorize')(pure_vectorize)
b.add_function(alias='pandas_vectorize')(pandas_vectorize)
b.add_function(alias='cached_vectorize')(cached_vectorize)
b.add_function(alias='cython_vectorize')(cython_vectorize)

@b.add_arguments('array size')
def argument_provider():
    np.random.seed(0)
    for exponent in range(6, 20):
        size = 2**exponent
        yield size, np.random.randint(0, 10, size=(size, 2))

r = b.run()
r.plot()

根据这些时间排名(从快到慢):

  • Cython 版本
  • Pandas解决方案(来自
  • 纯溶液(原post)
  • NumPys 向量化
  • 使用缓存的非 Cython 版本

如果函数调用非常便宜,那么普通的 NumPy 解决方案只会慢 5-10 倍。 pandas 解决方案还有一个更大的常数因子,使其成为非常小的数组中最慢的解决方案。

基准:昂贵的函数(time.sleep(0.001)),大量重复

如果函数调用实际上很昂贵(如 time.sleep),np.vectorize 解决方案将 慢很多 ,但是要少得多其他解决方案之间的区别:

# This shows only the difference compared to the previous benchmark
def func_to_vectorize(x):
    sleep(0.001)
    return x

@b.add_arguments('array size')
def argument_provider():
    np.random.seed(0)
    for exponent in range(5, 10):
        size = 2**exponent
        yield size, np.random.randint(0, 10, size=(size, 2))

基准测试:快速函数,很少重复

然而,如果你没有那么多重复项,那么普通 np.vectorize 几乎与纯 pandas 解决方案一样快,只比 Cython 版本慢一点:

# Again just difference to the original benchmark is shown
@b.add_arguments('array size')
def argument_provider():
    np.random.seed(0)
    for exponent in range(6, 20):
        size = 2**exponent
        # Maximum value is now depending on the size to ensures there 
        # are less duplicates in the array
        yield size, np.random.randint(0, size // 10, size=(size, 2))

这个问题实际上很有趣,因为它是计算时间和内存消耗之间权衡的完美示例。

从算法的角度寻找唯一元素,并最终只计算唯一元素,可以通过两种方式实现:

  • 两次(或多次)通过方法:

    • 找出所有唯一元素
    • 找出唯一元素在哪里
    • 计算唯一元素的函数
    • 将所有计算出的唯一元素放在正确的位置
  • 单程方法:

    • 随时随地计算元素并缓存结果
    • 如果某个元素在缓存中,则从那里获取它

算法的复杂性取决于输入的大小 N 和唯一元素的数量 U。后者也可以使用独特元素的 r = U / N 比率来形式化。

更多遍 方法在理论上更慢。但是,对于小NU来说,它们还是相当有竞争力的。 single-pass 方法在理论上更快,但这在很大程度上也取决于 缓存 方法以及它们如何执行取决于 U. 当然,无论渐近行为多么重要,实际时间确实取决于常数计算时间因素。 这个问题中最相关的是 func() 计算时间。

接近

可以比较多种方法:

  • 未缓存

    • pure() 这将是基本函数并且可能已经被矢量化
    • np.vectorized() 这将是 NumPy 标准矢量化装饰器
  • 更多通行证接近

    • np_unique():使用 np.unique() 找到唯一值并使用索引(来自 np.unique() 输出)构造结果(本质上等同于 vectorize_pure() 来自 )
    • pd_unique():使用 pd.unique() 找到唯一值并使用索引(通过 np.searchsorted())构造结果(本质上等同于 vectorize_with_pandas() 来自 )
    • set_unique():仅使用 set() 找到唯一值并使用索引(通过 np.searchsorted())构建结果
    • set_unique_msk():使用简单的 set()(如 set_unique())找到唯一值,并使用循环和掩码来构造结果(而不是索引)
    • nb_unique():使用 numba JIT 加速
    • 的显式循环找到唯一值及其索引
    • cy_unique():使用显式循环 cython
    • 找到唯一值及其索引
  • 单程接近

    • cached_dict():使用 Python dict 进行缓存(O(1) 查找)
    • cached_dict_cy():与上面相同,但使用了 Cython(本质上等同于 中的 vectorized_cached_impl()
    • cached_arr_cy():使用数组进行缓存(O(U) 查找)

纯()

def pure(x):
    return 2 * x

np.vectorized()

import numpy as np


vectorized = np.vectorize(pure)
vectorized.__name__ = 'vectorized'

np_unique()

import functools
import numpy as np


def vectorize_np_unique(func):
    @functools.wraps(func)
    def func_vect(arr):
        uniques, ix = np.unique(arr, return_inverse=True)
        result = np.array([func(x) for x in uniques])
        return result[ix].reshape(arr.shape)
    return func_vect

np_unique = vectorize_np_unique(pure)
np_unique.__name__ = 'np_unique'

pd_unique()

import functools
import numpy as np
import pandas as pd


def vectorize_pd_unique(func):
    @functools.wraps(func)
    def func_vect(arr):
        shape = arr.shape
        arr = arr.ravel()
        uniques = np.sort(pd.unique(arr))
        f_range = np.array([func(x) for x in uniques])
        return f_range[np.searchsorted(uniques, arr)].reshape(shape)
    return func_vect

pd_unique = vectorize_pd_unique(pure)
pd_unique.__name__ = 'pd_unique'

set_unique()

import functools


def vectorize_set_unique(func):
    @functools.wraps(func)
    def func_vect(arr):
        shape = arr.shape
        arr = arr.ravel()
        uniques = sorted(set(arr))
        result = np.array([func(x) for x in uniques])
        return result[np.searchsorted(uniques, arr)].reshape(shape)
    return func_vect

set_unique = vectorize_set_unique(pure)
set_unique.__name__ = 'set_unique'

set_unique_msk()

import functools


def vectorize_set_unique_msk(func):
    @functools.wraps(func)
    def func_vect(arr):
        result = np.empty_like(arr)
        for x in set(arr.ravel()):
            result[arr == x] = func(x)
        return result
    return func_vect

set_unique_msk = vectorize_set_unique_msk(pure)
set_unique_msk.__name__ = 'set_unique_msk'

nb_unique()

import functools
import numpy as np
import numba as nb
import flyingcircus as fc


@nb.jit(forceobj=False, nopython=True, nogil=True, parallel=True)
def numba_unique(arr, max_uniques):
    ix = np.empty(arr.size, dtype=np.int64)
    uniques = np.empty(max_uniques, dtype=arr.dtype)
    j = 0
    for i in range(arr.size):
        found = False
        for k in nb.prange(j):
            if arr[i] == uniques[k]:
                found = True
                break
        if not found:
            uniques[j] = arr[i]
            j += 1            
    uniques = np.sort(uniques[:j])
    # : get indices
    num_uniques = j
    for j in nb.prange(num_uniques):
        x = uniques[j]
        for i in nb.prange(arr.size):
            if arr[i] == x:
                ix[i] = j
    return uniques, ix


@fc.base.parametric
def vectorize_nb_unique(func, max_uniques=-1):
    @functools.wraps(func)
    def func_vect(arr):
        nonlocal max_uniques
        shape = arr.shape
        arr = arr.ravel()
        if max_uniques <= 0:
            m = arr.size
        elif isinstance(max_uniques, int):
            m = min(max_uniques, arr.size)
        elif isinstance(max_uniques, float):
            m = int(arr.size * min(max_uniques, 1.0))
        uniques, ix = numba_unique(arr, m)
        result = np.array([func(x) for x in uniques])
        return result[ix].reshape(shape)
    return func_vect

nb_unique = vectorize_nb_unique()(pure)
nb_unique.__name__ = 'nb_unique'

cy_unique()

%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import numpy as np
import cython as cy

cimport cython as ccy
cimport numpy as cnp


ctypedef fused arr_t:
    cnp.uint16_t
    cnp.uint32_t
    cnp.uint64_t
    cnp.int16_t
    cnp.int32_t
    cnp.int64_t
    cnp.float32_t
    cnp.float64_t
    cnp.complex64_t
    cnp.complex128_t


def sort_numpy(arr_t[:] a):
    np.asarray(a).sort()


cpdef cnp.int64_t cython_unique(
        arr_t[:] arr,
        arr_t[::1] uniques,
        cnp.int64_t[:] ix):
    cdef size_t size = arr.size
    cdef arr_t x
    cdef cnp.int64_t i, j, k, num_uniques
    j = 0
    for i in range(size):
        found = False
        for k in range(j):
            if arr[i] == uniques[k]:
                found = True
                break
        if not found:
            uniques[j] = arr[i]
            j += 1            
    sort_numpy(uniques[:j])
    num_uniques = j
    for j in range(num_uniques):
        x = uniques[j]
        for i in range(size):
            if arr[i] == x:
                ix[i] = j
    return num_uniques
import functools
import numpy as np
import flyingcircus as fc


@fc.base.parametric
def vectorize_cy_unique(func, max_uniques=0):
    @functools.wraps(func)
    def func_vect(arr):
        shape = arr.shape
        arr = arr.ravel()
        if max_uniques <= 0:
            m = arr.size
        elif isinstance(max_uniques, int):
            m = min(max_uniques, arr.size)
        elif isinstance(max_uniques, float):
            m = int(arr.size * min(max_uniques, 1.0))
        ix = np.empty(arr.size, dtype=np.int64)
        uniques = np.empty(m, dtype=arr.dtype)
        num_uniques = cy_uniques(arr, uniques, ix)
        uniques = uniques[:num_uniques]
        result = np.array([func(x) for x in uniques])
        return result[ix].reshape(shape)
    return func_vect

cy_unique = vectorize_cy_unique()(pure)
cy_unique.__name__ = 'cy_unique'

cached_dict()

import functools
import numpy as np


def vectorize_cached_dict(func):
    @functools.wraps(func)
    def func_vect(arr):
        result = np.empty_like(arr.ravel())
        cache = {}
        for i, x in enumerate(arr.ravel()):
            if x not in cache:
                cache[x] = func(x)
            result[i] = cache[x]
        return result.reshape(arr.shape)
    return func_vect

cached_dict = vectorize_cached_dict(pure)
cached_dict.__name__ = 'cached_dict'

cached_dict_cy()

%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import numpy as np
import cython as cy

cimport cython as ccy
cimport numpy as cnp


ctypedef fused arr_t:
    cnp.uint16_t
    cnp.uint32_t
    cnp.uint64_t
    cnp.int16_t
    cnp.int32_t
    cnp.int64_t
    cnp.float32_t
    cnp.float64_t
    cnp.complex64_t
    cnp.complex128_t

ctypedef fused result_t:
    cnp.uint16_t
    cnp.uint32_t
    cnp.uint64_t
    cnp.int16_t
    cnp.int32_t
    cnp.int64_t
    cnp.float32_t
    cnp.float64_t
    cnp.complex64_t
    cnp.complex128_t


cpdef void apply_cached_dict_cy(arr_t[:] arr, result_t[:] result, object func):
    cdef size_t size = arr.size
    cdef size_t i
    cdef dict cache = {}
    cdef arr_t x
    cdef result_t y
    for i in range(size):
        x = arr[i]
        if x not in cache:
            y = func(x)
            cache[x] = y
        else:
            y = cache[x]
        result[i] = y
import functools
import flyingcircus as fc


@fc.base.parametric
def vectorize_cached_dict_cy(func, dtype=None):
    @functools.wraps(func)
    def func_vect(arr):
        nonlocal dtype
        shape = arr.shape
        arr = arr.ravel()
        result = np.empty_like(arr) if dtype is None else np.empty(arr.shape, dtype=dtype)
        apply_cached_dict_cy(arr, result, func)
        return np.reshape(result, shape)
    return func_vect

cached_dict_cy = vectorize_cached_dict_cy()(pure)
cached_dict_cy.__name__ = 'cached_dict_cy'

cached_arr_cy()

%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import numpy as np
import cython as cy

cimport cython as ccy
cimport numpy as cnp


ctypedef fused arr_t:
    cnp.uint16_t
    cnp.uint32_t
    cnp.uint64_t
    cnp.int16_t
    cnp.int32_t
    cnp.int64_t
    cnp.float32_t
    cnp.float64_t
    cnp.complex64_t
    cnp.complex128_t

ctypedef fused result_t:
    cnp.uint16_t
    cnp.uint32_t
    cnp.uint64_t
    cnp.int16_t
    cnp.int32_t
    cnp.int64_t
    cnp.float32_t
    cnp.float64_t
    cnp.complex64_t
    cnp.complex128_t


cpdef void apply_cached_arr_cy(
        arr_t[:] arr,
        result_t[:] result,
        object func,
        arr_t[:] uniques,
        result_t[:] func_uniques):
    cdef size_t i
    cdef size_t j
    cdef size_t k
    cdef size_t size = arr.size
    j = 0
    for i in range(size):
        found = False
        for k in range(j):
            if arr[i] == uniques[k]:
                found = True
                break
        if not found:
            uniques[j] = arr[i]
            func_uniques[j] = func(arr[i])
            result[i] = func_uniques[j]
            j += 1
        else:
            result[i] = func_uniques[k]
import functools
import numpy as np
import flyingcircus as fc


@fc.base.parametric
def vectorize_cached_arr_cy(func, dtype=None, max_uniques=None):
    @functools.wraps(func)
    def func_vect(arr):
        nonlocal dtype, max_uniques
        shape = arr.shape
        arr = arr.ravel()
        result = np.empty_like(arr) if dtype is None else np.empty(arr.shape, dtype=dtype)
        if max_uniques is None or max_uniques <= 0:
            max_uniques = arr.size
        elif isinstance(max_uniques, int):
            max_uniques = min(max_uniques, arr.size)
        elif isinstance(max_uniques, float):
            max_uniques = int(arr.size * min(max_uniques, 1.0))
        uniques = np.empty(max_uniques, dtype=arr.dtype)
        func_uniques = np.empty_like(arr) if dtype is None else np.empty(max_uniques, dtype=dtype)
        apply_cached_arr_cy(arr, result, func, uniques, func_uniques)
        return np.reshape(result, shape)
    return func_vect

cached_arr_cy = vectorize_cached_arr_cy()(pure)
cached_arr_cy.__name__ = 'cached_arr_cy'

备注

元装饰器@parametric(灵感来自here and available in FlyingCircus as flyingcircus.base.parametric)定义如下:

def parametric(decorator):
    @functools.wraps(decorator)
    def _decorator(*_args, **_kws):
        def _wrapper(func):
            return decorator(func, *_args, **_kws)

        return _wrapper

    return _decorator

Numba 无法比常规 Python 代码更有效地处理 单通道 方法,因为传递任意 callable 需要 Python object 启用支持,从而排除快速 JIT 循环。

Cython 有一些限制,您需要指定预期的结果数据类型。你也可以从输入的数据类型来初步猜测,但那不是很理想。

为了简单起见,一些需要临时存储的实现是使用静态 NumPy 数组实现的。例如,可以使用 C++ 中的动态数组改进这些实现,而不会降低速度,但会大大改善内存占用。

基准

只有 10 个唯一值(小于 ~0.05%)的慢函数

(这本质上是原始 post 的用例)。

具有 ~0.05% 唯一值的快速函数

具有约 10% 唯一值的快速函数

具有约 20% 唯一值的快速函数

完整的基准代码(基于this template) is available here.

讨论与结论

最快的方法取决于 NU。 对于慢速函数,所有缓存方法都比 vectorized() 更快。这个结果当然要半信半疑,因为这里测试的 slow 函数比 fast 函数慢了 ~4 个数量级,这么慢的解析函数真的不算太常见的。 如果函数可以立即以矢量化形式编写,那是目前为止最快的方法。

一般来说,只要 U / N 是 ~20% 或更少,cached_dict_cy()vectorized() 内存效率更高且更快(即使对于快速函数)。 它的主要缺点是需要 Cython,这是一个有点复杂的依赖项,它还需要指定结果数据类型。 np_unique() 方法比 vectorized() 更快(即使对于快速函数)只要 U / N 是 ~10% 或更少。 pd_unique() 方法仅对非常小的 U 和慢速函数具有竞争力。

对于非常小的 U,散列法的益处略小,cached_arr_cy() 是最快的方法。