转换 Spark Dataframe 列
Transforming Spark Dataframe Column
我正在使用 Spark 数据帧。我的数据框中有一个具有多个级别的分类变量。我正在尝试对此变量进行简单转换 - 仅选择具有大于 n 个观察值(例如,1000)的前几个级别。将所有其他级别归为 "Others" 类别。
我是 Spark 的新手,所以我一直在努力实现它。这是我到目前为止能够实现的目标:
# Extract all levels having > 1000 observations (df is the dataframe name)
val levels_count = df.groupBy("Col_name").count.filter("count >10000").sort(desc("count"))
# Extract the level names
val level_names = level_count.select("Col_name").rdd.map(x => x(0)).collect
这为我提供了一个数组,其中包含我想要保留的关卡名称。接下来,我应该定义可应用于该列的转换函数。这就是我卡住的地方。我相信我们需要创建一个用户定义的函数。这是我试过的:
# Define UDF
val var_transform = udf((x: String) => {
if (level_names contains x) x
else "others"
})
# Apply UDF to the column
val df_new = df.withColumn("Var_new", var_transform($"Col_name"))
但是,当我尝试 df_new.show
时,它会抛出一个 "Task not serializable" 异常。我究竟做错了什么?另外,有没有更好的方法来做到这一点?
谢谢!
在我看来,对于这种简单的转换,这是一个更好的解决方案:坚持使用 DataFrame API 并相信要优化的催化剂和 Tungsten(例如进行广播连接):
val levels_count = df
.groupBy($"Col_name".as("new_col_name"))
.count
.filter("count >10000")
val df_new = df
.join(levels_count,$"Col_name"===$"new_col_name", joinType="leftOuter")
.drop("Col_name")
.withColumn("new_col_name",coalesce($"new_col_name", lit("other")))
我正在使用 Spark 数据帧。我的数据框中有一个具有多个级别的分类变量。我正在尝试对此变量进行简单转换 - 仅选择具有大于 n 个观察值(例如,1000)的前几个级别。将所有其他级别归为 "Others" 类别。
我是 Spark 的新手,所以我一直在努力实现它。这是我到目前为止能够实现的目标:
# Extract all levels having > 1000 observations (df is the dataframe name)
val levels_count = df.groupBy("Col_name").count.filter("count >10000").sort(desc("count"))
# Extract the level names
val level_names = level_count.select("Col_name").rdd.map(x => x(0)).collect
这为我提供了一个数组,其中包含我想要保留的关卡名称。接下来,我应该定义可应用于该列的转换函数。这就是我卡住的地方。我相信我们需要创建一个用户定义的函数。这是我试过的:
# Define UDF
val var_transform = udf((x: String) => {
if (level_names contains x) x
else "others"
})
# Apply UDF to the column
val df_new = df.withColumn("Var_new", var_transform($"Col_name"))
但是,当我尝试 df_new.show
时,它会抛出一个 "Task not serializable" 异常。我究竟做错了什么?另外,有没有更好的方法来做到这一点?
谢谢!
在我看来,对于这种简单的转换,这是一个更好的解决方案:坚持使用 DataFrame API 并相信要优化的催化剂和 Tungsten(例如进行广播连接):
val levels_count = df
.groupBy($"Col_name".as("new_col_name"))
.count
.filter("count >10000")
val df_new = df
.join(levels_count,$"Col_name"===$"new_col_name", joinType="leftOuter")
.drop("Col_name")
.withColumn("new_col_name",coalesce($"new_col_name", lit("other")))