转换 Spark Dataframe 列

Transforming Spark Dataframe Column

我正在使用 Spark 数据帧。我的数据框中有一个具有多个级别的分类变量。我正在尝试对此变量进行简单转换 - 仅选择具有大于 n 个观察值(例如,1000)的前几个级别。将所有其他级别归为 "Others" 类别。

我是 Spark 的新手,所以我一直在努力实现它。这是我到目前为止能够实现的目标:

# Extract all levels having > 1000 observations (df is the dataframe name) 
val levels_count = df.groupBy("Col_name").count.filter("count >10000").sort(desc("count"))

# Extract the level names
val level_names = level_count.select("Col_name").rdd.map(x => x(0)).collect

这为我提供了一个数组,其中包含我想要保留的关卡名称。接下来,我应该定义可应用于该列的转换函数。这就是我卡住的地方。我相信我们需要创建一个用户定义的函数。这是我试过的:

# Define UDF
val var_transform = udf((x: String) => {
    if (level_names contains x) x
    else "others"
 })

# Apply UDF to the column
val df_new = df.withColumn("Var_new", var_transform($"Col_name"))

但是,当我尝试 df_new.show 时,它会抛出一个 "Task not serializable" 异常。我究竟做错了什么?另外,有没有更好的方法来做到这一点?

谢谢!

在我看来,对于这种简单的转换,这是一个更好的解决方案:坚持使用 DataFrame API 并相信要优化的催化剂和 Tungsten(例如进行广播连接):

val levels_count = df
  .groupBy($"Col_name".as("new_col_name"))
  .count
  .filter("count >10000")

val df_new = df
  .join(levels_count,$"Col_name"===$"new_col_name", joinType="leftOuter")
  .drop("Col_name")
  .withColumn("new_col_name",coalesce($"new_col_name", lit("other")))