从时间序列数据中获取间隔
Getting intervals from a time series data
我有一个很奇怪的问题。我目前正在处理 time-series 数据,并且在我的数据集中有几个峰值。该数据是使用中子密度测井机收集的,它描述了传感器连续一段时间记录的事件。数据中的峰值对应于该机器下钻孔时的某个有趣间隔。所以,高峰很重要。然而,重要的不仅仅是山峰。这是整个间隔(或者至少如我所描述的那样;参见我的附图)。现在我的问题是,是否有一种信号处理(最好在 Python 中)方法我可以使用或研究哪些可以让我将这个信号分为不同的间隔,每个对应于本地 minima/maxima?
我最初的方法是使用 Kleinberg 2002 描述的突发检测算法,但我没有成功,所以我想听听其他人对此的看法。
这是原始数据:
这就是我想要做的:
我从散点图中提取数据进行分析,这是我第一次解决问题时使用的示例代码作为构建基础。它所做的是将三阶多项式作为基线拟合到所有数据,然后在标记为 "rectangularize section" 的代码部分中查找基线曲线上方或下方截止距离处的数据点。我希望这至少可以提出解决问题的途径。我想到的一项改进是在 "rectangularize" 代码中实现最小宽度。
import numpy, matplotlib
import matplotlib.pyplot as plt
##########################################################
# data load section
f = open('/home/zunzun/neutron_refl_bore/rawdata.dat')
rawdata = f.read()
f.close()
xData = []
yData = []
for line in rawdata.split('\n'):
if line:
spl = line.split()
xData.append(float(spl[0]))
yData.append(float(spl[1]))
xData = numpy.array(xData)
yData = numpy.array(yData)
##########################################################
#rectangularize section
cutoffLimit = 1.6
polynomialOrder = 3 # for baseline curve
modelParameters = numpy.polyfit(xData, yData, polynomialOrder)
modelPredictions = numpy.polyval(modelParameters, xData)
modelDifference = yData - modelPredictions
maxDiff = max(modelDifference)
minDiff = min(modelDifference)
aboveLimit = (modelDifference > cutoffLimit) * maxDiff
belowLimit = (modelDifference < -cutoffLimit) * minDiff
rectanglarized = modelPredictions + + belowLimit + aboveLimit
##########################################################
# graphics output section
def ModelAndScatterPlot(graphWidth, graphHeight):
f = plt.figure(figsize=(graphWidth/100.0, graphHeight/100.0), dpi=100)
axes = f.add_subplot(111)
# first the raw data as a scatter plot
axes.plot(xData, yData, 'D')
# create data for the fitted equation plot
xModel = numpy.linspace(min(xData), max(xData))
yModel = numpy.polyval(modelParameters, xModel)
# now the model as a line plot
axes.plot(xModel, yModel)
# now show rectangularized
axes.plot(xData, rectanglarized)
axes.set_title('Data Outside Polynomial Baseline +/- Cutoff') # add a title
axes.set_xlabel('X Data') # X axis data label
axes.set_ylabel('Y Data') # Y axis data label
plt.show()
plt.close('all') # clean up after using pyplot
graphWidth = 800
graphHeight = 600
ModelAndScatterPlot(graphWidth, graphHeight)
由于没有原始数据,我生成了一些测试数据。方法如下。如果数据是一组具有跳跃的高原,检查移动 window 的标准偏差将在跳跃处给出峰值。使用阈值隔离峰值。通过在应用移动平均线后查看数据来估计跳跃。这为拟合提供了一个良好的起点。作为 fit 函数,我再次使用我最喜欢的 tanh
。
结果如下:
import matplotlib.pyplot as plt
import numpy as np
from operator import itemgetter
from itertools import groupby
from scipy.optimize import leastsq
def moving_average( data, windowsize, **kwargs ):
mode=kwargs.pop('mode', 'valid')
weight = np.ones( windowsize )/ float( windowsize )
return np.convolve( data, weight, mode=mode)
def moving_sigma( data, windowsize ):
l = len( data )
sout = list()
for i in range( l - windowsize + 1 ):
sout += [ np.std( data[ i : i + windowsize ] ) ]
return np.array( sout )
def m_step( x, s, x0List, ampList, off ):
assert len( x0List ) == len( ampList )
out = [ a * 0.5 * ( 1 + np.tanh( s * (x - x0) ) ) for a, x0 in zip( ampList, x0List )]
return sum( out ) + off
def residuals( params, xdata, ydata ):
assert not len(params) % 2
off = params[0]
s = params[1]
rest = params[2:]
n = len( rest )
x0 = rest[:n/2]
am = rest[n/2:]
diff = [ y - m_step( x,s, x0, am, off ) for x, y in zip( xdata, ydata ) ]
return diff
### generate data
np.random.seed(776)
a=0
data = list()
for i in range(15):
a = 50 * ( 1 - 2 * np.random.random() )
for _ in range ( np.random.randint( 5, 35 ) ):
data += [a]
xx = np.arange( len( data ), dtype=np.float )
noisydata = list()
for x in data:
noisydata += [ x + np.random.normal( scale=5 ) ]
###define problem specific parameters
myWindow = 10
thresh = 8
cutoffscale = 1.1
### data treatment
avdata = moving_average( noisydata, myWindow )
avx = moving_average( xx, myWindow )
sdata = moving_sigma( noisydata, myWindow )
### getting start values for the fit
seq = [x for x in np.where( sdata > thresh )[0] ]
# from
jumps = [ map( itemgetter(1), g ) for k, g in groupby( enumerate( seq ), lambda ( i, x ) : i - x ) ]
xjumps = [ [ avx[ k ] for k in xList ] for xList in jumps ]
means = [ np.mean( x ) for x in xjumps ]
### the delta is too small as one only looks above thresh so lets increase it by guessing
deltas = [ ( avdata[ x[-1] ] - avdata[ x[0] ] ) * cutoffscale for x in jumps ]
guess = [ avdata[0], 2] + means + deltas
### fitting
sol, err = leastsq( residuals, guess, args=( xx, noisydata ), maxfev=10000 )
fittedoff = sol[0]
fitteds = sol[1]
fittedx0 = sol[ 2 : 2 + len( means ) ]
fittedam = sol[ 2 + len( means ) : ]
### plotting
fig = plt.figure()
ax = fig.add_subplot( 1, 1, 1 )
# ~ ax.plot( data )
ax.plot( xx, noisydata, label='noisy data' )
ax.plot( avx, avdata, label='moving average' )
ax.plot( avx, sdata, label='window sigma' )
for j in means:
ax.axvline( j, c='#606060', linestyle=':' )
yy = [ m_step( x, 2, means, deltas, avdata[0] ) for x in xx]
bestyy = [ m_step( x, fitteds, fittedx0, fittedam, fittedoff ) for x in xx ]
ax.plot( xx, yy, label='guess' )
ax.plot( xx, bestyy, label='fit' )
plt.legend( loc=0 )
plt.show()
给出下图:
未检测到一些较小的跳跃,但应该清楚这是预期的。而且,整个过程也不是很快。由于阈值的原因,初始猜测变得更糟 x
存在更多的跳跃。最后,未检测到一系列小跳跃,因此数据中的结果斜率被拟合为一个具有明显大误差的大平台。可以为此引入额外的检查并添加相应的跳跃位置。
另请注意,window 大小的选择决定了两次跳跃将被检测到的距离。
我有一个很奇怪的问题。我目前正在处理 time-series 数据,并且在我的数据集中有几个峰值。该数据是使用中子密度测井机收集的,它描述了传感器连续一段时间记录的事件。数据中的峰值对应于该机器下钻孔时的某个有趣间隔。所以,高峰很重要。然而,重要的不仅仅是山峰。这是整个间隔(或者至少如我所描述的那样;参见我的附图)。现在我的问题是,是否有一种信号处理(最好在 Python 中)方法我可以使用或研究哪些可以让我将这个信号分为不同的间隔,每个对应于本地 minima/maxima?
我最初的方法是使用 Kleinberg 2002 描述的突发检测算法,但我没有成功,所以我想听听其他人对此的看法。
这是原始数据:
这就是我想要做的:
我从散点图中提取数据进行分析,这是我第一次解决问题时使用的示例代码作为构建基础。它所做的是将三阶多项式作为基线拟合到所有数据,然后在标记为 "rectangularize section" 的代码部分中查找基线曲线上方或下方截止距离处的数据点。我希望这至少可以提出解决问题的途径。我想到的一项改进是在 "rectangularize" 代码中实现最小宽度。
import numpy, matplotlib
import matplotlib.pyplot as plt
##########################################################
# data load section
f = open('/home/zunzun/neutron_refl_bore/rawdata.dat')
rawdata = f.read()
f.close()
xData = []
yData = []
for line in rawdata.split('\n'):
if line:
spl = line.split()
xData.append(float(spl[0]))
yData.append(float(spl[1]))
xData = numpy.array(xData)
yData = numpy.array(yData)
##########################################################
#rectangularize section
cutoffLimit = 1.6
polynomialOrder = 3 # for baseline curve
modelParameters = numpy.polyfit(xData, yData, polynomialOrder)
modelPredictions = numpy.polyval(modelParameters, xData)
modelDifference = yData - modelPredictions
maxDiff = max(modelDifference)
minDiff = min(modelDifference)
aboveLimit = (modelDifference > cutoffLimit) * maxDiff
belowLimit = (modelDifference < -cutoffLimit) * minDiff
rectanglarized = modelPredictions + + belowLimit + aboveLimit
##########################################################
# graphics output section
def ModelAndScatterPlot(graphWidth, graphHeight):
f = plt.figure(figsize=(graphWidth/100.0, graphHeight/100.0), dpi=100)
axes = f.add_subplot(111)
# first the raw data as a scatter plot
axes.plot(xData, yData, 'D')
# create data for the fitted equation plot
xModel = numpy.linspace(min(xData), max(xData))
yModel = numpy.polyval(modelParameters, xModel)
# now the model as a line plot
axes.plot(xModel, yModel)
# now show rectangularized
axes.plot(xData, rectanglarized)
axes.set_title('Data Outside Polynomial Baseline +/- Cutoff') # add a title
axes.set_xlabel('X Data') # X axis data label
axes.set_ylabel('Y Data') # Y axis data label
plt.show()
plt.close('all') # clean up after using pyplot
graphWidth = 800
graphHeight = 600
ModelAndScatterPlot(graphWidth, graphHeight)
由于没有原始数据,我生成了一些测试数据。方法如下。如果数据是一组具有跳跃的高原,检查移动 window 的标准偏差将在跳跃处给出峰值。使用阈值隔离峰值。通过在应用移动平均线后查看数据来估计跳跃。这为拟合提供了一个良好的起点。作为 fit 函数,我再次使用我最喜欢的 tanh
。
结果如下:
import matplotlib.pyplot as plt
import numpy as np
from operator import itemgetter
from itertools import groupby
from scipy.optimize import leastsq
def moving_average( data, windowsize, **kwargs ):
mode=kwargs.pop('mode', 'valid')
weight = np.ones( windowsize )/ float( windowsize )
return np.convolve( data, weight, mode=mode)
def moving_sigma( data, windowsize ):
l = len( data )
sout = list()
for i in range( l - windowsize + 1 ):
sout += [ np.std( data[ i : i + windowsize ] ) ]
return np.array( sout )
def m_step( x, s, x0List, ampList, off ):
assert len( x0List ) == len( ampList )
out = [ a * 0.5 * ( 1 + np.tanh( s * (x - x0) ) ) for a, x0 in zip( ampList, x0List )]
return sum( out ) + off
def residuals( params, xdata, ydata ):
assert not len(params) % 2
off = params[0]
s = params[1]
rest = params[2:]
n = len( rest )
x0 = rest[:n/2]
am = rest[n/2:]
diff = [ y - m_step( x,s, x0, am, off ) for x, y in zip( xdata, ydata ) ]
return diff
### generate data
np.random.seed(776)
a=0
data = list()
for i in range(15):
a = 50 * ( 1 - 2 * np.random.random() )
for _ in range ( np.random.randint( 5, 35 ) ):
data += [a]
xx = np.arange( len( data ), dtype=np.float )
noisydata = list()
for x in data:
noisydata += [ x + np.random.normal( scale=5 ) ]
###define problem specific parameters
myWindow = 10
thresh = 8
cutoffscale = 1.1
### data treatment
avdata = moving_average( noisydata, myWindow )
avx = moving_average( xx, myWindow )
sdata = moving_sigma( noisydata, myWindow )
### getting start values for the fit
seq = [x for x in np.where( sdata > thresh )[0] ]
# from
jumps = [ map( itemgetter(1), g ) for k, g in groupby( enumerate( seq ), lambda ( i, x ) : i - x ) ]
xjumps = [ [ avx[ k ] for k in xList ] for xList in jumps ]
means = [ np.mean( x ) for x in xjumps ]
### the delta is too small as one only looks above thresh so lets increase it by guessing
deltas = [ ( avdata[ x[-1] ] - avdata[ x[0] ] ) * cutoffscale for x in jumps ]
guess = [ avdata[0], 2] + means + deltas
### fitting
sol, err = leastsq( residuals, guess, args=( xx, noisydata ), maxfev=10000 )
fittedoff = sol[0]
fitteds = sol[1]
fittedx0 = sol[ 2 : 2 + len( means ) ]
fittedam = sol[ 2 + len( means ) : ]
### plotting
fig = plt.figure()
ax = fig.add_subplot( 1, 1, 1 )
# ~ ax.plot( data )
ax.plot( xx, noisydata, label='noisy data' )
ax.plot( avx, avdata, label='moving average' )
ax.plot( avx, sdata, label='window sigma' )
for j in means:
ax.axvline( j, c='#606060', linestyle=':' )
yy = [ m_step( x, 2, means, deltas, avdata[0] ) for x in xx]
bestyy = [ m_step( x, fitteds, fittedx0, fittedam, fittedoff ) for x in xx ]
ax.plot( xx, yy, label='guess' )
ax.plot( xx, bestyy, label='fit' )
plt.legend( loc=0 )
plt.show()
给出下图:
未检测到一些较小的跳跃,但应该清楚这是预期的。而且,整个过程也不是很快。由于阈值的原因,初始猜测变得更糟 x
存在更多的跳跃。最后,未检测到一系列小跳跃,因此数据中的结果斜率被拟合为一个具有明显大误差的大平台。可以为此引入额外的检查并添加相应的跳跃位置。
另请注意,window 大小的选择决定了两次跳跃将被检测到的距离。