如何在 python 中使用 kNN 的动态时间扭曲

How to use Dynamic Time warping with kNN in python

我有一个包含两个标签(01)的时间序列数据集。我正在使用 动态时间扭曲 (DTW) 作为使用 k 最近邻 (kNN) 进行分类的相似性度量,如这两篇精彩的博客文章所述:

但是,对于使用 kNN 进行分类,这两个帖子使用了他们自己的 kNN 算法。

我想在我的分类中使用 sklearn 的选项,例如 gridsearchcv。因此,我想知道如何将动态时间规整 (DTW) 与 sklearn kNN 一起使用。

注意:我不限于 sklearn 也很高兴在其他图书馆收到答案

如果需要,我很乐意提供更多详细信息。

您可以为 KNN 使用自定义指标。 因此,您只需要自己实现 DTW(或 use/adapt python 中任何现有的 DTW 实现) [gist of this code]

import numpy as np
from scipy.spatial import distance
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

#toy dataset 
X = np.random.random((100,10))
y = np.random.randint(0,2, (100))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

#custom metric
def DTW(a, b):   
    an = a.size
    bn = b.size
    pointwise_distance = distance.cdist(a.reshape(-1,1),b.reshape(-1,1))
    cumdist = np.matrix(np.ones((an+1,bn+1)) * np.inf)
    cumdist[0,0] = 0

    for ai in range(an):
        for bi in range(bn):
            minimum_cost = np.min([cumdist[ai, bi+1],
                                   cumdist[ai+1, bi],
                                   cumdist[ai, bi]])
            cumdist[ai+1, bi+1] = pointwise_distance[ai,bi] + minimum_cost

    return cumdist[an, bn]

#train
parameters = {'n_neighbors':[2, 4, 8]}
clf = GridSearchCV(KNeighborsClassifier(metric=DTW), parameters, cv=3, verbose=1)
clf.fit(X_train, y_train)



#evaluate
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))

产生

Fitting 3 folds for each of 3 candidates, totalling 9 fits        

[Parallel(n_jobs=1)]: Done   9 out of   9 | elapsed:   29.0s finished

                         precision    recall  f1-score   support

                      0       0.57      0.89      0.70        18
                      1       0.60      0.20      0.30        15

            avg / total       0.58      0.58      0.52        33

使用dtaidistance。这是我正在使用的简化管道,以便找到最适合长度在 1 到 20 之间的所有 windows:

from dtaidistance import dtw
from sklearn.metrics import f1_score

def knn(trainX,trainY,testX,testY,w):
    predictions = np.zeros(len(testX))

    for testSampleIndex,testSample in enumerate(testX):
        minimumDistance = float('inf')
        for trainingSampleIndex, trainingSample in enumerate(trainX):
            distanceBetweenTestAndTrainingAScan = dtw.distance(testSample,trainingSample,use_c=True,window=w,max_dist=minimumDistance)
            if (distanceBetweenTestAndTrainingAScan < minimumDistance):
                minimumDistance = distanceBetweenTestAndTrainingAScan
                predictions[testSampleIndex] = trainY[trainingSampleIndex]

    return [testY,predictions]

def DTWForCurrentDataSet(testX,testY,trainX,trainY,testDataSetID):
    testDataSetBestF1Score = -float("inf")
    testDataSetBestPredictions = []
    for w in range(1,21):
        [testY,predictions] = knn(trainX,trainY,testX,testY,w)

        microF1Score = f1_score(testY, predictions, average='micro')
        if (microF1Score > testDataSetBestF1Score):
            testDataSetBestF1Score = microF1Score
            testDataSetBestPredictions = predictions
    return testDataSetBestPredictions

def runDTW(database):
    for testDataSetID in database:
        [testX,testY,trainX,trainY,patientIDsForTraining] = createTestingAndTrainingSets(database,testDataSetID)
        testDataSetBestPredictions = DTWForCurrentDataSet(testX,testY,trainX,trainY,testDataSetID)