使用 numpy.where 或任何等效函数对迭代算法进行矢量化
Vectorize an iterative algorithm with numpy.where or any equivalent function
我希望我可以用 np.where 或一些等效的(高效的)numpy 函数构造以下算法:
def generate_signal(r):
signal = np.zeros(len(r), dtype=int)
lastSignal = 0
for i in range(len(r)):
if r[i] <= 30:
lastSignal = 1
elif r[i] >= 60:
lastSignal = 0
signal[i] = lastSignal
return signal
这是一个 input/output 的例子:
r = np.array([50, 52, 59, 69, 47, 33, 27, 26, 20, 30, 33, 35, 58, 55, 48, 60, 68, 55, 43, 49, 33, 30, 22, 28])
s = generate_signal(r)
print(s) # This is the result: [0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 1 1 1]
print(list(zip(r, s))) # A zipped result (in case it helps): [(50, 0), (52, 0), (59, 0), (69, 0), (47, 0), (33, 0), (27, 1), (26, 1), (20, 1), (30, 1), (33, 1), (35, 1), (58, 1), (55, 1), (48, 1), (60, 0), (68, 0), (55, 0), (43, 0), (49, 0), (33, 0), (30, 1), (22, 1), (28, 1)]
您可以按如下方式使用np.repeat
:
import numpy as np
def f_pp():
rr = np.concatenate([[70], r, [70]])
l30, g60 = rr <= 30, rr >= 60
df, = np.where(l30 | g60)
reps = np.diff(df)
reps[0] -= 1
return l30[df[:-1]].astype(int).repeat(reps)
使用 OP 的示例和一些较大的(1000
元素)随机的时间和验证:
def generate_signal():
signal = np.zeros(len(r), dtype=int)
lastSignal = 0
for i in range(len(r)):
if r[i] <= 30:
lastSignal = 1
elif r[i] >= 60:
lastSignal = 0
signal[i] = lastSignal
return signal
def f_ff():
pos_tran = np.maximum(0, np.diff(np.int8(r <= 30)))
neg_tran = np.maximum(0, np.diff(np.int8(r >= 60)))
neg_tran[0:np.nonzero(pos_tran)[0][0]] = 0
return np.insert(np.cumsum(pos_tran - neg_tran), 0, 0)
from timeit import timeit
glb = globals()
kwds = dict(globals=glb, number=1000)
r = np.array([50, 52, 59, 69, 47, 33, 27, 26, 20, 30, 33, 35, 58, 55, 48, 60, 68, 55, 43, 49, 33, 30, 22, 28])
print('OP: {:8.4f} ms'.format(timeit(generate_signal, **kwds)))
print('ff: {:8.4f} ms'.format(timeit(f_ff, **kwds)))
print('pp: {:8.4f} ms'.format(timeit(f_pp, **kwds)))
print('ff correct:', np.all(generate_signal() == f_ff()))
print('pp correct:', np.all(generate_signal() == f_pp()))
R = np.random.randint(20, 70, (10, 1000))
print('OP: {:8.4f} ms'.format(sum(timeit(generate_signal, **kwds) for glb['r'] in R) / 10))
print('ff: {:8.4f} ms'.format(sum(timeit(f_ff, **kwds) for glb['r'] in R) / 10))
print('pp: {:8.4f} ms'.format(sum(timeit(f_pp, **kwds) for glb['r'] in R) / 10))
print('ff correct:', all(np.all(generate_signal() == f_ff()) for glb['r'] in R))
print('pp correct:', all(np.all(generate_signal() == f_pp()) for glb['r'] in R))
示例输出:
OP: 0.0138 ms
ff: 0.0405 ms
pp: 0.0121 ms
ff correct: True
pp correct: True
OP: 0.5386 ms
ff: 0.0539 ms
pp: 0.0215 ms
ff correct: False
pp correct: True
构建结果信号的正负瞬变:
>>> pos_tran = np.maximum(0, np.diff(np.int8(r <= 30)))
>>> neg_tran = np.maximum(0, np.diff(np.int8(r >= 60)))
在第一个正瞬变之前移除负瞬变:
>>> neg_tran[0:np.nonzero(pos_tran)[0][0]] = 0
积分信号(累计和)并重新插入丢失在np.diff中的前导0:
>>> np.insert(np.cumsum(pos_tran - neg_tran), 0, 0)
array([0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1])
我希望我可以用 np.where 或一些等效的(高效的)numpy 函数构造以下算法:
def generate_signal(r):
signal = np.zeros(len(r), dtype=int)
lastSignal = 0
for i in range(len(r)):
if r[i] <= 30:
lastSignal = 1
elif r[i] >= 60:
lastSignal = 0
signal[i] = lastSignal
return signal
这是一个 input/output 的例子:
r = np.array([50, 52, 59, 69, 47, 33, 27, 26, 20, 30, 33, 35, 58, 55, 48, 60, 68, 55, 43, 49, 33, 30, 22, 28])
s = generate_signal(r)
print(s) # This is the result: [0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 1 1 1]
print(list(zip(r, s))) # A zipped result (in case it helps): [(50, 0), (52, 0), (59, 0), (69, 0), (47, 0), (33, 0), (27, 1), (26, 1), (20, 1), (30, 1), (33, 1), (35, 1), (58, 1), (55, 1), (48, 1), (60, 0), (68, 0), (55, 0), (43, 0), (49, 0), (33, 0), (30, 1), (22, 1), (28, 1)]
您可以按如下方式使用np.repeat
:
import numpy as np
def f_pp():
rr = np.concatenate([[70], r, [70]])
l30, g60 = rr <= 30, rr >= 60
df, = np.where(l30 | g60)
reps = np.diff(df)
reps[0] -= 1
return l30[df[:-1]].astype(int).repeat(reps)
使用 OP 的示例和一些较大的(1000
元素)随机的时间和验证:
def generate_signal():
signal = np.zeros(len(r), dtype=int)
lastSignal = 0
for i in range(len(r)):
if r[i] <= 30:
lastSignal = 1
elif r[i] >= 60:
lastSignal = 0
signal[i] = lastSignal
return signal
def f_ff():
pos_tran = np.maximum(0, np.diff(np.int8(r <= 30)))
neg_tran = np.maximum(0, np.diff(np.int8(r >= 60)))
neg_tran[0:np.nonzero(pos_tran)[0][0]] = 0
return np.insert(np.cumsum(pos_tran - neg_tran), 0, 0)
from timeit import timeit
glb = globals()
kwds = dict(globals=glb, number=1000)
r = np.array([50, 52, 59, 69, 47, 33, 27, 26, 20, 30, 33, 35, 58, 55, 48, 60, 68, 55, 43, 49, 33, 30, 22, 28])
print('OP: {:8.4f} ms'.format(timeit(generate_signal, **kwds)))
print('ff: {:8.4f} ms'.format(timeit(f_ff, **kwds)))
print('pp: {:8.4f} ms'.format(timeit(f_pp, **kwds)))
print('ff correct:', np.all(generate_signal() == f_ff()))
print('pp correct:', np.all(generate_signal() == f_pp()))
R = np.random.randint(20, 70, (10, 1000))
print('OP: {:8.4f} ms'.format(sum(timeit(generate_signal, **kwds) for glb['r'] in R) / 10))
print('ff: {:8.4f} ms'.format(sum(timeit(f_ff, **kwds) for glb['r'] in R) / 10))
print('pp: {:8.4f} ms'.format(sum(timeit(f_pp, **kwds) for glb['r'] in R) / 10))
print('ff correct:', all(np.all(generate_signal() == f_ff()) for glb['r'] in R))
print('pp correct:', all(np.all(generate_signal() == f_pp()) for glb['r'] in R))
示例输出:
OP: 0.0138 ms
ff: 0.0405 ms
pp: 0.0121 ms
ff correct: True
pp correct: True
OP: 0.5386 ms
ff: 0.0539 ms
pp: 0.0215 ms
ff correct: False
pp correct: True
构建结果信号的正负瞬变:
>>> pos_tran = np.maximum(0, np.diff(np.int8(r <= 30)))
>>> neg_tran = np.maximum(0, np.diff(np.int8(r >= 60)))
在第一个正瞬变之前移除负瞬变:
>>> neg_tran[0:np.nonzero(pos_tran)[0][0]] = 0
积分信号(累计和)并重新插入丢失在np.diff中的前导0:
>>> np.insert(np.cumsum(pos_tran - neg_tran), 0, 0)
array([0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1])