在分组数据上使用 PySpark Imputer
Using PySpark Imputer on grouped data
我有一个 Class
列,它可以是 1、2 或 3,而另一列 Age
有一些缺失的数据。我想估算每个 Class
组的平均值 Age
。
我想一起做点什么:
grouped_data = df.groupBy('Class')
imputer = Imputer(inputCols=['Age'], outputCols=['imputed_Age'])
imputer.fit(grouped_data)
有什么解决方法吗?
感谢您的宝贵时间
您需要使用拟合模型转换数据框。然后取填充数据的平均值:
from pyspark.sql import functions as F
imputer = Imputer(inputCols=['Age'], outputCols=['imputed_Age'])
imp_model = imputer.fit(df)
transformed_df = imp_model.transform(df)
transformed_df \
.groupBy('Class') \
.agg(F.avg('Age'))
使用 Imputer,您可以将数据集过滤到每个 Class
值,估算平均值,然后将它们连接回去,因为您提前知道值可以是什么:
subsets = []
for i in range(1, 4):
imputer = Imputer(inputCols=['Age'], outputCols=['imputed_Age'])
subset_df = df.filter(col('Class') == i)
imputed_subset = imputer.fit(subset_df).transform(subset_df)
subsets.append(imputed_subset)
# Union them together
# If you only have 3 just do it without a loop
imputed_df = subsets[0].unionByName(subsets[1]).unionByName(subsets[2])
如果您事先不知道这些值是什么,或者如果它们不容易迭代,您可以 groupBy,将每个组的平均值作为 DataFrame,然后将其合并回您的原始数据框。
import pyspark.sql.functions as F
averages = df.groupBy("Class").agg(F.avg("Age").alias("avgAge"))
df_with_avgs = df.join(averages, on="Class")
imputed_df = df_with_avgs.withColumn("imputedAge", F.coalesce("Age", "avgAge"))
我有一个 Class
列,它可以是 1、2 或 3,而另一列 Age
有一些缺失的数据。我想估算每个 Class
组的平均值 Age
。
我想一起做点什么:
grouped_data = df.groupBy('Class')
imputer = Imputer(inputCols=['Age'], outputCols=['imputed_Age'])
imputer.fit(grouped_data)
有什么解决方法吗?
感谢您的宝贵时间
您需要使用拟合模型转换数据框。然后取填充数据的平均值:
from pyspark.sql import functions as F
imputer = Imputer(inputCols=['Age'], outputCols=['imputed_Age'])
imp_model = imputer.fit(df)
transformed_df = imp_model.transform(df)
transformed_df \
.groupBy('Class') \
.agg(F.avg('Age'))
使用 Imputer,您可以将数据集过滤到每个 Class
值,估算平均值,然后将它们连接回去,因为您提前知道值可以是什么:
subsets = []
for i in range(1, 4):
imputer = Imputer(inputCols=['Age'], outputCols=['imputed_Age'])
subset_df = df.filter(col('Class') == i)
imputed_subset = imputer.fit(subset_df).transform(subset_df)
subsets.append(imputed_subset)
# Union them together
# If you only have 3 just do it without a loop
imputed_df = subsets[0].unionByName(subsets[1]).unionByName(subsets[2])
如果您事先不知道这些值是什么,或者如果它们不容易迭代,您可以 groupBy,将每个组的平均值作为 DataFrame,然后将其合并回您的原始数据框。
import pyspark.sql.functions as F
averages = df.groupBy("Class").agg(F.avg("Age").alias("avgAge"))
df_with_avgs = df.join(averages, on="Class")
imputed_df = df_with_avgs.withColumn("imputedAge", F.coalesce("Age", "avgAge"))