Pyspark 使用 Window 函数和我自己的函数
Pyspark using Window function with my own function
我有一个 Pandas 的代码,可以计算大小为 x 的 window 线性回归的 R2。查看我的代码:
def lr_r2_Sklearn(data):
data = np.array(data)
X = pd.Series(list(range(0,len(data),1))).values.reshape(-1,1)
Y = data.reshape(-1,1)
regressor = LinearRegression()
regressor.fit(X,Y)
return(regressor.score(X,Y))
r2_rolling = df[['value']].rolling(300).agg([lr_r2_Sklearn])
我正在制作尺寸为 300 的滚动并计算每个 window 的 r2。我希望做完全相同的事情,但使用 pyspark 和 spark 数据框。我知道我必须使用Window函数,但它比pandas更难理解,所以我迷路了......
我有这个,但我不知道如何让它发挥作用。
w = Window().partitionBy(lit(1)).rowsBetween(-299,0)
data.select(lr_r2('value').over(w).alias('r2')).show()
(lr_r2 return r2)
谢谢!
您需要一个带有 pandas 有界条件的 udf。这在 spark3.0 之前是不可能的,并且正在开发中。
在这里参考答案:
但是,您可以探索 pyspark 的 ml 包:
http://spark.apache.org/docs/2.4.0/api/python/pyspark.ml.html#pyspark.ml.classification.LinearSVC
因此,您可以定义一个模型,如 linearSVC,并在组装后将数据帧的各个部分传递给它。我建议使用由阶段、汇编器和分类器组成的管道,然后通过一些唯一的 id 对其进行过滤,使用数据框的各个部分在循环中调用它们。
我有一个 Pandas 的代码,可以计算大小为 x 的 window 线性回归的 R2。查看我的代码:
def lr_r2_Sklearn(data):
data = np.array(data)
X = pd.Series(list(range(0,len(data),1))).values.reshape(-1,1)
Y = data.reshape(-1,1)
regressor = LinearRegression()
regressor.fit(X,Y)
return(regressor.score(X,Y))
r2_rolling = df[['value']].rolling(300).agg([lr_r2_Sklearn])
我正在制作尺寸为 300 的滚动并计算每个 window 的 r2。我希望做完全相同的事情,但使用 pyspark 和 spark 数据框。我知道我必须使用Window函数,但它比pandas更难理解,所以我迷路了......
我有这个,但我不知道如何让它发挥作用。
w = Window().partitionBy(lit(1)).rowsBetween(-299,0)
data.select(lr_r2('value').over(w).alias('r2')).show()
(lr_r2 return r2)
谢谢!
您需要一个带有 pandas 有界条件的 udf。这在 spark3.0 之前是不可能的,并且正在开发中。
在这里参考答案: