使用 numpy 数组进行条件向量化计算而不使用直接掩码
conditional vectorized calculation with numpy arrays without using direct masking
import numpy as np
repeat=int(1e5)
r_base = np.linspace(0,4,5)
a_base = 2
np.random.seed(0)
r_mat = r_base * np.random.uniform(0.9,1.1,(repeat,5))
a_array = a_base * np.random.uniform(0.9,1.1, repeat)
# original slow approach
def func_vetorized_level1(r_row, a):
if r_row.mean()>2:
result = np.where((r_row >= a), r_row - a, np.nan)
else:
result = np.where((r_row >= a), r_row + a, 0)
return result
# try to broadcast this func to every row of r_mat using list comprehension
def func_list_level2(r_mat, a_array):
res_mat = np.array([func_vetorized_level1(this_r_row, this_a)
for this_r_row, this_a in zip(r_mat, a_array)])
return res_mat
# faster with direct masking, but with unnecessary more calculation
def f_faster(r_mat,a_array):
a = a_array[:, None] # to column vector
row_mask = (r_mat.mean(axis=1) > 2)[:,None]
elem_mask = r_mat >= a
out = np.empty_like(r_mat)
out[row_mask & elem_mask] = (r_mat - a)[row_mask & elem_mask]
out[~row_mask & elem_mask] = (r_mat + a)[~row_mask & elem_mask]
out[row_mask & ~elem_mask] = np.nan
out[~row_mask & ~elem_mask] = 0
return out
# fastest with ufunc in numpy as suggested by @mad_physicist
def f_fastest(r_mat,a_array):
a = a_array[:, None] # to column vector
row_mask = (r_mat.mean(axis=1) > 2)[:,None]
elem_mask = r_mat >= a
out = np.empty_like(r_mat)
np.subtract(r_mat, a, out=out, where=row_mask & elem_mask)
np.add(r_mat, a, out=out, where=~row_mask & elem_mask)
out[row_mask & ~elem_mask] = np.nan
out[~row_mask & ~elem_mask] = 0
return out
请问是否可以有一个用户自定义的func可以使用,或者利用最快的方法?我考虑过使用索引,但发现它具有挑战性,因为使用 [row_ind, co_ind]
的切片元素是所选元素的一维数组。我看到可以使用 reshape
将切片矩阵放入矩阵,但是有一种优雅的方法吗?理想情况下,此 r_mat + a
操作可以由用户定义的函数替换。
你绝对可以有一个带有用户定义函数的矢量化解决方案,只要该函数被矢量化以在一维数组上工作element-wise(使用 numpy 函数编写的任何东西都应该是这种情况)盒子)。
假设您有 r_mat
作为 (m, n)
矩阵,a_array
作为 (m,)
向量。您可以编写函数来接受挂钩。每个钩子都可以是常量或可调用对象。如果它是可调用的,则使用两个相同长度的数组调用它,并且必须 return 第三个相同长度的数组。您可以更改该合约以包含指数或您想要的任何内容:
def f(r_mat, a_array, hook11, hook01, hook10, hook00):
a = a_array[:, None] # to column vector
row_mask = (r_mat.mean(axis=1) > 2)[:,None]
elem_mask = r_mat >= a
out = np.empty_like(r_mat)
def apply_hook(mask, hook):
r, c = np.nonzero(mask)
out[r, c] = hook(r_mat[r, c], a_array[r]) if callable(hook) else hook
apply_hook(row_mask & elem_mask, hook11)
apply_hook(~row_mask & elem_mask, hook01)
apply_hook(row_mask & ~elem_mask, hook10)
apply_hook(~row_mask & ~elem_mask, hook00)
return out
您代码中的当前配置将被称为
f(r_mat, a_array, np.subtract, np.add, np.nan, 0)
假设您想做一些比 np.subtract
更复杂的事情。你可以这样做,例如:
def my_complicated_func(r, a):
return np.cumsum(r, a) - 3 * r // a + np.exp(a)
f(r_mat, a_array, my_complicated_func, np.add, np.nan, 0.0)
关键是my_complicated_func
对数组进行操作。它将传递 r_mat
元素的子集,并且 a_array
的元素会根据需要沿每一行重复多次。
你也可以用知道每个位置索引的函数做同样的事情。只需将 hook
称为 hook(r_mat[r, c], a_array[r], r, c)
。现在挂钩函数必须接受两个额外的参数。原始代码相当于
f(r_mat, a_array, lambda r, a, *args: np.subtract(r, a), lambda r, a, *args: np.add(r, a), np.nan, 0)
import numpy as np
repeat=int(1e5)
r_base = np.linspace(0,4,5)
a_base = 2
np.random.seed(0)
r_mat = r_base * np.random.uniform(0.9,1.1,(repeat,5))
a_array = a_base * np.random.uniform(0.9,1.1, repeat)
# original slow approach
def func_vetorized_level1(r_row, a):
if r_row.mean()>2:
result = np.where((r_row >= a), r_row - a, np.nan)
else:
result = np.where((r_row >= a), r_row + a, 0)
return result
# try to broadcast this func to every row of r_mat using list comprehension
def func_list_level2(r_mat, a_array):
res_mat = np.array([func_vetorized_level1(this_r_row, this_a)
for this_r_row, this_a in zip(r_mat, a_array)])
return res_mat
# faster with direct masking, but with unnecessary more calculation
def f_faster(r_mat,a_array):
a = a_array[:, None] # to column vector
row_mask = (r_mat.mean(axis=1) > 2)[:,None]
elem_mask = r_mat >= a
out = np.empty_like(r_mat)
out[row_mask & elem_mask] = (r_mat - a)[row_mask & elem_mask]
out[~row_mask & elem_mask] = (r_mat + a)[~row_mask & elem_mask]
out[row_mask & ~elem_mask] = np.nan
out[~row_mask & ~elem_mask] = 0
return out
# fastest with ufunc in numpy as suggested by @mad_physicist
def f_fastest(r_mat,a_array):
a = a_array[:, None] # to column vector
row_mask = (r_mat.mean(axis=1) > 2)[:,None]
elem_mask = r_mat >= a
out = np.empty_like(r_mat)
np.subtract(r_mat, a, out=out, where=row_mask & elem_mask)
np.add(r_mat, a, out=out, where=~row_mask & elem_mask)
out[row_mask & ~elem_mask] = np.nan
out[~row_mask & ~elem_mask] = 0
return out
请问是否可以有一个用户自定义的func可以使用,或者利用最快的方法?我考虑过使用索引,但发现它具有挑战性,因为使用 [row_ind, co_ind]
的切片元素是所选元素的一维数组。我看到可以使用 reshape
将切片矩阵放入矩阵,但是有一种优雅的方法吗?理想情况下,此 r_mat + a
操作可以由用户定义的函数替换。
你绝对可以有一个带有用户定义函数的矢量化解决方案,只要该函数被矢量化以在一维数组上工作element-wise(使用 numpy 函数编写的任何东西都应该是这种情况)盒子)。
假设您有 r_mat
作为 (m, n)
矩阵,a_array
作为 (m,)
向量。您可以编写函数来接受挂钩。每个钩子都可以是常量或可调用对象。如果它是可调用的,则使用两个相同长度的数组调用它,并且必须 return 第三个相同长度的数组。您可以更改该合约以包含指数或您想要的任何内容:
def f(r_mat, a_array, hook11, hook01, hook10, hook00):
a = a_array[:, None] # to column vector
row_mask = (r_mat.mean(axis=1) > 2)[:,None]
elem_mask = r_mat >= a
out = np.empty_like(r_mat)
def apply_hook(mask, hook):
r, c = np.nonzero(mask)
out[r, c] = hook(r_mat[r, c], a_array[r]) if callable(hook) else hook
apply_hook(row_mask & elem_mask, hook11)
apply_hook(~row_mask & elem_mask, hook01)
apply_hook(row_mask & ~elem_mask, hook10)
apply_hook(~row_mask & ~elem_mask, hook00)
return out
您代码中的当前配置将被称为
f(r_mat, a_array, np.subtract, np.add, np.nan, 0)
假设您想做一些比 np.subtract
更复杂的事情。你可以这样做,例如:
def my_complicated_func(r, a):
return np.cumsum(r, a) - 3 * r // a + np.exp(a)
f(r_mat, a_array, my_complicated_func, np.add, np.nan, 0.0)
关键是my_complicated_func
对数组进行操作。它将传递 r_mat
元素的子集,并且 a_array
的元素会根据需要沿每一行重复多次。
你也可以用知道每个位置索引的函数做同样的事情。只需将 hook
称为 hook(r_mat[r, c], a_array[r], r, c)
。现在挂钩函数必须接受两个额外的参数。原始代码相当于
f(r_mat, a_array, lambda r, a, *args: np.subtract(r, a), lambda r, a, *args: np.add(r, a), np.nan, 0)