在 Pyspark 中迭代保存到新的 DataFrame
Saving iteratively to a new DataFrame in Pyspark
我正在基于 3 个不同的 PySpark 数据帧执行计算。
从某种意义上说,该脚本可以正常工作,它可以按应有的方式执行计算,但是,我很难正确处理上述计算的结果。
import sys
import numpy as np
from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)
# Dummy Data
df = sqlContext.createDataFrame([[0,1,0,0,0],[1,1,0,0,1],[0,0,1,0,1],[1,0,1,1,0],[1,1,0,0,0]], ['p1', 'p2', 'p3', 'p4', 'p5'])
df.show()
+---+---+---+---+---+
| p1| p2| p3| p4| p5|
+---+---+---+---+---+
| 0| 1| 0| 0| 0|
| 1| 1| 0| 0| 1|
| 0| 0| 1| 0| 1|
| 1| 0| 1| 1| 0|
| 1| 1| 0| 0| 0|
+---+---+---+---+---+
# Values
values = sqlContext.createDataFrame([(0,1,'p1'),(None,1,'p2'),(0,0,'p3'),(None,0, 'p4'),(1,None,'p5')], ('f1', 'f2','index'))
values.show()
+----+----+-----+
| f1| f2|index|
+----+----+-----+
| 0| 1| p1|
|null| 1| p2|
| 0| 0| p3|
|null| 0| p4|
| 1|null| p5|
+----+----+-----+
# Weights
weights = sqlContext.createDataFrame([(4,3,'p1'),(None,1,'p2'),(2,2,'p3'),(None, 3, 'p4'),(3,None,'p5')], ('f1', 'f2','index'))
weights.show()
+----+----+-----+
| f1| f2|index|
+----+----+-----+
| 4| 3| p1|
|null| 1| p2|
| 2| 2| p3|
|null| 3| p4|
| 3|null| p5|
+----+----+-----+
# Function: it sums the vector W for the values of Row equal to the value of V and then divide by the length of V.
# If there a no similarities between Row and V outputs 0
def W_sum(row,v,w):
if len(w[row==v])>0:
return float(np.sum(w[row==v])/len(w))
else:
return 0.0
对于 Data 中的每一列和每一行,应用上述函数。
# We iterate over the columns of Values (except the last one called index)
for val in values.columns[:-1]:
# we filter the data to work only with the columns that are defined for the selected Value
defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
# we select only the useful columns
df_select= df.select(defined_col)
# we retrieve the reference value and weights
V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
df_select.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in df_select.columns))))
这给出:
+---+---+---+---+---+---+
| p1| p2| p3| p4| p5| f1|
+---+---+---+---+---+---+
| 0| 1| 0| 0| 0|2.0|
| 1| 1| 0| 0| 1|1.0|
| 0| 0| 1| 0| 1|2.0|
| 1| 0| 1| 1| 0|0.0|
| 1| 1| 0| 0| 0|0.0|
+---+---+---+---+---+---+
它按照我的要求将列添加到切片的 DataFrame 中。问题是我宁愿将数据收集到一个新的数据中,最后我可以访问它来查询结果。
可以像 pandas?
那样在 PySpark 中增长(稍微有效地)DataFrame
编辑以使我的目标更清晰:
理想情况下,我会得到一个只有计算列的 DataFrame,如下所示:
+---+---+
| f1| f2|
+---+---+
|2.0|1.0|
|1.0|2.0|
|2.0|0.0|
|0.0|0.0|
|0.0|2.0|
+---+---+
你的问题有些问题...
首先,您的 for
循环会产生一个错误,因为最后一行的 df_select
没有定义;最后也没有赋值(它产生什么?)。
假设 df_select
实际上是您的 subsubsample
数据框,之前定义了一些行,并且您的最后一行类似于
new_df = subsubsample.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in subsubsample.columns))))
然后你的问题开始变得更清楚了。自
values.columns[:-1]
# ['f1', 'f2']
整个循环的结果就是
+---+---+---+---+---+
| p1| p2| p3| p4| f2|
+---+---+---+---+---+
| 0| 1| 0| 0|1.0|
| 1| 1| 0| 0|2.0|
| 0| 0| 1| 0|0.0|
| 1| 0| 1| 1|0.0|
| 1| 1| 0| 0|2.0|
+---+---+---+---+---+
即仅包含 f2
列(自然,因为带有 f1
的结果会被简单地覆盖)。
现在,正如我所说,假设情况是这样的,而您的问题实际上是如何同时拥有两列 f1
和 f2
在一起而不是在不同的数据框中,你可以忘记 subsubsample
并将列附加到你的初始 df
,之后可能会删除不需要的列:
init_cols = df.columns
init_cols
# ['p1', 'p2', 'p3', 'p4', 'p5']
new_df = df
for val in values.columns[:-1]:
# we filter the data to work only with the columns that are defined for the selected Value
defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
# we retrieve the reference value and weights
V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
new_df = new_df.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in defined_col)))) # change here
# drop initial columns:
for i in init_cols:
new_df = new_df.drop(i)
结果 new_df
将是:
+---+---+
| f1| f2|
+---+---+
|2.0|1.0|
|1.0|2.0|
|2.0|0.0|
|0.0|0.0|
|0.0|2.0|
+---+---+
更新(评论后):要强制 W_sum
函数中的除法为浮点数,请使用:
from __future__ import division
new_df
现在将是:
+---------+----+
| f1| f2|
+---------+----+
| 2.0| 1.5|
|1.6666666|2.25|
|2.3333333|0.75|
| 0.0|0.75|
|0.6666667|2.25|
+---------+----+
与 f2
完全符合您的评论。
我正在基于 3 个不同的 PySpark 数据帧执行计算。
从某种意义上说,该脚本可以正常工作,它可以按应有的方式执行计算,但是,我很难正确处理上述计算的结果。
import sys
import numpy as np
from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)
# Dummy Data
df = sqlContext.createDataFrame([[0,1,0,0,0],[1,1,0,0,1],[0,0,1,0,1],[1,0,1,1,0],[1,1,0,0,0]], ['p1', 'p2', 'p3', 'p4', 'p5'])
df.show()
+---+---+---+---+---+
| p1| p2| p3| p4| p5|
+---+---+---+---+---+
| 0| 1| 0| 0| 0|
| 1| 1| 0| 0| 1|
| 0| 0| 1| 0| 1|
| 1| 0| 1| 1| 0|
| 1| 1| 0| 0| 0|
+---+---+---+---+---+
# Values
values = sqlContext.createDataFrame([(0,1,'p1'),(None,1,'p2'),(0,0,'p3'),(None,0, 'p4'),(1,None,'p5')], ('f1', 'f2','index'))
values.show()
+----+----+-----+
| f1| f2|index|
+----+----+-----+
| 0| 1| p1|
|null| 1| p2|
| 0| 0| p3|
|null| 0| p4|
| 1|null| p5|
+----+----+-----+
# Weights
weights = sqlContext.createDataFrame([(4,3,'p1'),(None,1,'p2'),(2,2,'p3'),(None, 3, 'p4'),(3,None,'p5')], ('f1', 'f2','index'))
weights.show()
+----+----+-----+
| f1| f2|index|
+----+----+-----+
| 4| 3| p1|
|null| 1| p2|
| 2| 2| p3|
|null| 3| p4|
| 3|null| p5|
+----+----+-----+
# Function: it sums the vector W for the values of Row equal to the value of V and then divide by the length of V.
# If there a no similarities between Row and V outputs 0
def W_sum(row,v,w):
if len(w[row==v])>0:
return float(np.sum(w[row==v])/len(w))
else:
return 0.0
对于 Data 中的每一列和每一行,应用上述函数。
# We iterate over the columns of Values (except the last one called index)
for val in values.columns[:-1]:
# we filter the data to work only with the columns that are defined for the selected Value
defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
# we select only the useful columns
df_select= df.select(defined_col)
# we retrieve the reference value and weights
V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
df_select.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in df_select.columns))))
这给出:
+---+---+---+---+---+---+
| p1| p2| p3| p4| p5| f1|
+---+---+---+---+---+---+
| 0| 1| 0| 0| 0|2.0|
| 1| 1| 0| 0| 1|1.0|
| 0| 0| 1| 0| 1|2.0|
| 1| 0| 1| 1| 0|0.0|
| 1| 1| 0| 0| 0|0.0|
+---+---+---+---+---+---+
它按照我的要求将列添加到切片的 DataFrame 中。问题是我宁愿将数据收集到一个新的数据中,最后我可以访问它来查询结果。
可以像 pandas?
编辑以使我的目标更清晰:
理想情况下,我会得到一个只有计算列的 DataFrame,如下所示:
+---+---+
| f1| f2|
+---+---+
|2.0|1.0|
|1.0|2.0|
|2.0|0.0|
|0.0|0.0|
|0.0|2.0|
+---+---+
你的问题有些问题...
首先,您的 for
循环会产生一个错误,因为最后一行的 df_select
没有定义;最后也没有赋值(它产生什么?)。
假设 df_select
实际上是您的 subsubsample
数据框,之前定义了一些行,并且您的最后一行类似于
new_df = subsubsample.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in subsubsample.columns))))
然后你的问题开始变得更清楚了。自
values.columns[:-1]
# ['f1', 'f2']
整个循环的结果就是
+---+---+---+---+---+
| p1| p2| p3| p4| f2|
+---+---+---+---+---+
| 0| 1| 0| 0|1.0|
| 1| 1| 0| 0|2.0|
| 0| 0| 1| 0|0.0|
| 1| 0| 1| 1|0.0|
| 1| 1| 0| 0|2.0|
+---+---+---+---+---+
即仅包含 f2
列(自然,因为带有 f1
的结果会被简单地覆盖)。
现在,正如我所说,假设情况是这样的,而您的问题实际上是如何同时拥有两列 f1
和 f2
在一起而不是在不同的数据框中,你可以忘记 subsubsample
并将列附加到你的初始 df
,之后可能会删除不需要的列:
init_cols = df.columns
init_cols
# ['p1', 'p2', 'p3', 'p4', 'p5']
new_df = df
for val in values.columns[:-1]:
# we filter the data to work only with the columns that are defined for the selected Value
defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
# we retrieve the reference value and weights
V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
new_df = new_df.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in defined_col)))) # change here
# drop initial columns:
for i in init_cols:
new_df = new_df.drop(i)
结果 new_df
将是:
+---+---+
| f1| f2|
+---+---+
|2.0|1.0|
|1.0|2.0|
|2.0|0.0|
|0.0|0.0|
|0.0|2.0|
+---+---+
更新(评论后):要强制 W_sum
函数中的除法为浮点数,请使用:
from __future__ import division
new_df
现在将是:
+---------+----+
| f1| f2|
+---------+----+
| 2.0| 1.5|
|1.6666666|2.25|
|2.3333333|0.75|
| 0.0|0.75|
|0.6666667|2.25|
+---------+----+
与 f2
完全符合您的评论。