如何使用 Pandas 找到股票的平均方向运动?

How to find Average directional movement for stocks using Pandas?

我有一个 OHLCV 数据的数据框。我想知道是否有人知道任何教程或任何使用 pandas?

查找 ADX(平均方向运动)的方法
import pandas as pd 
import yfinance as yf
import matplotlib.pyplot as plt
import datetime as dt 
import  numpy as nm 


start=dt.datetime.today()-dt.timedelta(59)
end=dt.datetime.today()

df=pd.DataFrame(yf.download("MSFT", start=start, end=end))

平均方向指数或 ADX 是构成由 J. Welles Wilder, Jr. 开发的技术交易系统的五个指标中的主要技术指标,并使用构成交易的其他指标计算得出系统。 ADX 主要用作动量或趋势强度的指标,但整个 ADX 系统也用作方向指标。
通过比较两个连续低点之间的差异与其各自高点之间的差异来计算方向运动。

对于 ADX 的 excel 计算,这是一个非常好的视频:

https://www.youtube.com/watch?v=LKDJQLrXedg&t=387s

数学取自here.

def ADX(df):

    def getCDM(df):
        dmpos = df["High"][-1] - df["High"][-2]
        dmneg = df["Low"][-2] - df["Low"][-1]
        if dmpos > dmneg:
            return dmpos
        else:
            return dmneg 

    def getDMnTR(df):
        DMpos = []
        DMneg = []
        TRarr = []
        n = round(len(df)/14)
        idx = n
        while n <= (len(df)):
            dmpos = df["High"][n-1] - df["High"][n-2]
            dmneg = df["Low"][n-2] - df["Low"][n-1]
                
            DMpos.append(dmpos)
            DMneg.append(dmneg)
        
            a1 = df["High"][n-1] - df["High"][n-2]
            a2 = df["High"][n-1] - df["Close"][n-2]
            a3 = df["Low"][n-1] - df["Close"][n-2]
            TRarr.append(max(a1,a2,a3))

            n = idx + n
    
        return DMpos, DMneg, TRarr

    def getDI(df):
        DMpos, DMneg, TR = getDMnTR(df)
        CDM = getCDM(df)
        POSsmooth = (sum(DMpos) - sum(DMpos)/len(DMpos) + CDM)
        NEGsmooth = (sum(DMneg) - sum(DMneg)/len(DMneg) + CDM)
        
        DIpos = (POSsmooth / (sum(TR)/len(TR))) *100
        DIneg = (NEGsmooth / (sum(TR)/len(TR))) *100

        return DIpos, DIneg

    def getADX(df):
        DIpos, DIneg = getDI(df)

        dx = (abs(DIpos- DIneg) / abs(DIpos + DIneg)) * 100
        
       
        ADX = dx/14
        return ADX

    return(getADX(df))

print(ADX(df))

我试了一下这个,发现了一些可以帮助你解决这个问题的东西:

def ADX(data: pd.DataFrame, period: int):
    """
    Computes the ADX indicator.
    """
    
    df = data.copy()
    alpha = 1/period

    # TR
    df['H-L'] = df['High'] - df['Low']
    df['H-C'] = np.abs(df['High'] - df['Close'].shift(1))
    df['L-C'] = np.abs(df['Low'] - df['Close'].shift(1))
    df['TR'] = df[['H-L', 'H-C', 'L-C']].max(axis=1)
    del df['H-L'], df['H-C'], df['L-C']

    # ATR
    df['ATR'] = df['TR'].ewm(alpha=alpha, adjust=False).mean()

    # +-DX
    df['H-pH'] = df['High'] - df['High'].shift(1)
    df['pL-L'] = df['Low'].shift(1) - df['Low']
    df['+DX'] = np.where(
        (df['H-pH'] > df['pL-L']) & (df['H-pH']>0),
        df['H-pH'],
        0.0
    )
    df['-DX'] = np.where(
        (df['H-pH'] < df['pL-L']) & (df['pL-L']>0),
        df['pL-L'],
        0.0
    )
    del df['H-pH'], df['pL-L']

    # +- DMI
    df['S+DM'] = df['+DX'].ewm(alpha=alpha, adjust=False).mean()
    df['S-DM'] = df['-DX'].ewm(alpha=alpha, adjust=False).mean()
    df['+DMI'] = (df['S+DM']/df['ATR'])*100
    df['-DMI'] = (df['S-DM']/df['ATR'])*100
    del df['S+DM'], df['S-DM']

    # ADX
    df['DX'] = (np.abs(df['+DMI'] - df['-DMI'])/(df['+DMI'] + df['-DMI']))*100
    df['ADX'] = df['DX'].ewm(alpha=alpha, adjust=False).mean()
    del df['DX'], df['ATR'], df['TR'], df['-DX'], df['+DX'], df['+DMI'], df['-DMI']

    return df

开始时值不正确(与 EWM 方法一样),但经过多次计算后它收敛到正确的值。

这为您提供了 Tradingview 和 Thinkorswim 的确切数字。

import numpy as np

def ema(arr, periods=14, weight=1, init=None):
    leading_na = np.where(~np.isnan(arr))[0][0]
    arr = arr[leading_na:]
    alpha = weight / (periods + (weight-1))
    alpha_rev = 1 - alpha
    n = arr.shape[0]
    pows = alpha_rev**(np.arange(n+1))
    out1 = np.array([])
    if 0 in pows:
        out1 = ema(arr[:int(len(arr)/2)], periods)
        arr = arr[int(len(arr)/2) - 1:]
        init = out1[-1]
        n = arr.shape[0]
        pows = alpha_rev**(np.arange(n+1))
    scale_arr = 1/pows[:-1]
    if init:
        offset = init * pows[1:]
    else:
        offset = arr[0]*pows[1:]
    pw0 = alpha*alpha_rev**(n-1)
    mult = arr*pw0*scale_arr
    cumsums = mult.cumsum()
    out = offset + cumsums*scale_arr[::-1]
    out = out[1:] if len(out1) > 0 else out
    out = np.concatenate([out1, out])
    out[:periods] = np.nan
    out = np.concatenate(([np.nan]*leading_na, out))
    return out


def atr(highs, lows, closes, periods=14, ema_weight=1):
    hi = np.array(highs)
    lo = np.array(lows)
    c = np.array(closes)
    tr = np.vstack([np.abs(hi[1:]-c[:-1]),
                    np.abs(lo[1:]-c[:-1]),
                    (hi-lo)[1:]]).max(axis=0)
    atr = ema(tr, periods=periods, weight=ema_weight)
    atr = np.concatenate([[np.nan], atr])
    return atr


def adx(highs, lows, closes, periods=14):
    highs = np.array(highs)
    lows = np.array(lows)
    closes = np.array(closes)
    up = highs[1:] - highs[:-1]
    down = lows[:-1] - lows[1:]
    up_idx = up > down
    down_idx = down > up
    updm = np.zeros(len(up))
    updm[up_idx] = up[up_idx]
    updm[updm < 0] = 0
    downdm = np.zeros(len(down))
    downdm[down_idx] = down[down_idx]
    downdm[downdm < 0] = 0
    _atr = atr(highs, lows, closes, periods)[1:]
    updi = 100 * ema(updm, periods) / _atr
    downdi = 100 * ema(downdm, periods) / _atr
    zeros = (updi + downdi == 0)
    downdi[zeros] = .0000001
    adx = 100 * np.abs(updi - downdi) / (updi + downdi)
    adx = ema(np.concatenate([[np.nan], adx]), periods)
    return adx