如何在 Python 中向量化这个循环的峰值查找?
How to vectorize this peak finding for loop in Python?
基本上我正在编写一个峰值查找函数,需要能够在基准测试中击败 scipy.argrelextrema
。这是我正在使用的数据的 link,代码:
https://drive.google.com/open?id=1U-_xQRWPoyUXhQUhFgnM3ByGw-1VImKB
如果这个link过期了,数据可以在杜高斯贝银行的在线历史数据下载器找到。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
data = pd.read_csv('EUR_USD.csv')
data.columns = ['Date', 'open', 'high', 'low', 'close','volume']
data.Date = pd.to_datetime(data.Date, format='%d.%m.%Y %H:%M:%S.%f')
data = data.set_index(data.Date)
data = data[['open', 'high', 'low', 'close']]
data = data.drop_duplicates(keep=False)
price = data.close.values
def fft_detect(price, p=0.4):
trans = np.fft.rfft(price)
trans[round(p*len(trans)):] = 0
inv = np.fft.irfft(trans)
dy = np.gradient(inv)
peaks_idx = np.where(np.diff(np.sign(dy)) == -2)[0] + 1
valleys_idx = np.where(np.diff(np.sign(dy)) == 2)[0] + 1
patt_idx = list(peaks_idx) + list(valleys_idx)
patt_idx.sort()
label = [x for x in np.diff(np.sign(dy)) if x != 0]
# Look for Better Peaks
l = 2
new_inds = []
for i in range(0,len(patt_idx[:-1])):
search = np.arange(patt_idx[i]-(l+1),patt_idx[i]+(l+1))
if label[i] == -2:
idx = price[search].argmax()
elif label[i] == 2:
idx = price[search].argmin()
new_max = search[idx]
new_inds.append(new_max)
plt.plot(price)
plt.plot(inv)
plt.scatter(patt_idx,price[patt_idx])
plt.scatter(new_inds,price[new_inds],c='g')
plt.show()
return peaks_idx, price[peaks_idx]
它基本上使用快速傅立叶变换 (FFT) 来平滑数据,然后取导数来找到平滑数据的最小和最大索引,然后在未平滑数据上找到相应的峰值。有时,由于某些平滑效果,它找到的峰值不是理想的,所以我 运行 这个 for 循环为 l
指定的边界之间的每个索引搜索更高或更低的点。我需要帮助矢量化此 for
循环!我不知道该怎么做。如果没有 for
循环,我的代码比 scipy.argrelextrema
快大约 50%,但 for 循环会减慢它的速度。因此,如果我能找到一种对其进行矢量化的方法,那将是 scipy.argrelextrema
的一种非常快速且非常有效的替代方法。这两张图片分别代表没有和有 for
循环的数据。
这个可以。它并不完美,但希望它能获得您想要的,并向您展示如何矢量化。很高兴听到您想到的任何改进
label = np.array(label[:-1]) # not sure why this is 1 unit longer than search.shape[0]?
# the idea is to make the index matrix you're for looping over row by row all in one go.
# This part is sloppy and you can improve this generation.
search = np.vstack((np.arange(patt_idx[i]-(l+1),patt_idx[i]+(l+1)) for i in range(0,len(patt_idx[:-1])))) # you can refine this.
# then you can make the price matrix
price = price[search]
# and you can swap the sign of elements so you only need to do argmin instead of both argmin and argmax
price[label==-2] = - price[label==-2]
# now find the indices of the minimum price on each row
idx = np.argmin(price,axis=1)
# and then extract the refined indices from the search matrix
new_inds = search[np.arange(idx.shape[0]),idx] # this too can be cleaner.
# not sure what's going on here so that search[:,idx] doesn't work for me
# probably just a misunderstanding
我发现这重现了您的结果,但我没有计时。我怀疑搜索生成速度很慢,但可能仍然比您的 for 循环快。
编辑:
这是生成 search
的更好方法:
patt_idx = np.array(patt_idx)
starts = patt_idx[:-1]-(l+1)
stops = patt_idx[:-1]+(l+1)
ds = stops-starts
s0 = stops.shape[0]
s1 = ds[0]
search = np.reshape(np.repeat(stops - ds.cumsum(), ds) + np.arange(ds.sum()),(s0,s1))
这是一个替代方案...它使用列表理解,通常比 for 循环更快
l = 2
# Define the bounds beforehand, its marginally faster than doing it in the loop
upper = np.array(patt_idx) + l + 1
lower = np.array(patt_idx) - l - 1
# List comprehension...
new_inds = [price[low:hi].argmax() + low if lab == -2 else
price[low:hi].argmin() + low
for low, hi, lab in zip(lower, upper, label)]
# Find maximum within each interval
new_max = price[new_inds]
new_global_max = np.max(new_max)
基本上我正在编写一个峰值查找函数,需要能够在基准测试中击败 scipy.argrelextrema
。这是我正在使用的数据的 link,代码:
https://drive.google.com/open?id=1U-_xQRWPoyUXhQUhFgnM3ByGw-1VImKB
如果这个link过期了,数据可以在杜高斯贝银行的在线历史数据下载器找到。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
data = pd.read_csv('EUR_USD.csv')
data.columns = ['Date', 'open', 'high', 'low', 'close','volume']
data.Date = pd.to_datetime(data.Date, format='%d.%m.%Y %H:%M:%S.%f')
data = data.set_index(data.Date)
data = data[['open', 'high', 'low', 'close']]
data = data.drop_duplicates(keep=False)
price = data.close.values
def fft_detect(price, p=0.4):
trans = np.fft.rfft(price)
trans[round(p*len(trans)):] = 0
inv = np.fft.irfft(trans)
dy = np.gradient(inv)
peaks_idx = np.where(np.diff(np.sign(dy)) == -2)[0] + 1
valleys_idx = np.where(np.diff(np.sign(dy)) == 2)[0] + 1
patt_idx = list(peaks_idx) + list(valleys_idx)
patt_idx.sort()
label = [x for x in np.diff(np.sign(dy)) if x != 0]
# Look for Better Peaks
l = 2
new_inds = []
for i in range(0,len(patt_idx[:-1])):
search = np.arange(patt_idx[i]-(l+1),patt_idx[i]+(l+1))
if label[i] == -2:
idx = price[search].argmax()
elif label[i] == 2:
idx = price[search].argmin()
new_max = search[idx]
new_inds.append(new_max)
plt.plot(price)
plt.plot(inv)
plt.scatter(patt_idx,price[patt_idx])
plt.scatter(new_inds,price[new_inds],c='g')
plt.show()
return peaks_idx, price[peaks_idx]
它基本上使用快速傅立叶变换 (FFT) 来平滑数据,然后取导数来找到平滑数据的最小和最大索引,然后在未平滑数据上找到相应的峰值。有时,由于某些平滑效果,它找到的峰值不是理想的,所以我 运行 这个 for 循环为 l
指定的边界之间的每个索引搜索更高或更低的点。我需要帮助矢量化此 for
循环!我不知道该怎么做。如果没有 for
循环,我的代码比 scipy.argrelextrema
快大约 50%,但 for 循环会减慢它的速度。因此,如果我能找到一种对其进行矢量化的方法,那将是 scipy.argrelextrema
的一种非常快速且非常有效的替代方法。这两张图片分别代表没有和有 for
循环的数据。
这个可以。它并不完美,但希望它能获得您想要的,并向您展示如何矢量化。很高兴听到您想到的任何改进
label = np.array(label[:-1]) # not sure why this is 1 unit longer than search.shape[0]?
# the idea is to make the index matrix you're for looping over row by row all in one go.
# This part is sloppy and you can improve this generation.
search = np.vstack((np.arange(patt_idx[i]-(l+1),patt_idx[i]+(l+1)) for i in range(0,len(patt_idx[:-1])))) # you can refine this.
# then you can make the price matrix
price = price[search]
# and you can swap the sign of elements so you only need to do argmin instead of both argmin and argmax
price[label==-2] = - price[label==-2]
# now find the indices of the minimum price on each row
idx = np.argmin(price,axis=1)
# and then extract the refined indices from the search matrix
new_inds = search[np.arange(idx.shape[0]),idx] # this too can be cleaner.
# not sure what's going on here so that search[:,idx] doesn't work for me
# probably just a misunderstanding
我发现这重现了您的结果,但我没有计时。我怀疑搜索生成速度很慢,但可能仍然比您的 for 循环快。
编辑:
这是生成 search
的更好方法:
patt_idx = np.array(patt_idx)
starts = patt_idx[:-1]-(l+1)
stops = patt_idx[:-1]+(l+1)
ds = stops-starts
s0 = stops.shape[0]
s1 = ds[0]
search = np.reshape(np.repeat(stops - ds.cumsum(), ds) + np.arange(ds.sum()),(s0,s1))
这是一个替代方案...它使用列表理解,通常比 for 循环更快
l = 2
# Define the bounds beforehand, its marginally faster than doing it in the loop
upper = np.array(patt_idx) + l + 1
lower = np.array(patt_idx) - l - 1
# List comprehension...
new_inds = [price[low:hi].argmax() + low if lab == -2 else
price[low:hi].argmin() + low
for low, hi, lab in zip(lower, upper, label)]
# Find maximum within each interval
new_max = price[new_inds]
new_global_max = np.max(new_max)