在 Spark Dataframe 中创建具有函数的新列

Create new column with function in Spark Dataframe

我正在尝试找出 Spark 中的新数据框 API。似乎向前迈出了一大步,但在做一些本应非常简单的事情时却遇到了麻烦。我有一个包含 2 列的数据框,“ID”和“金额”。作为一个通用示例,假设我想要 return 一个名为“代码”的新列,该列 return 是一个基于“Amt”值的代码。我可以编写这样的函数:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

当我尝试这样使用它时:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", coder(myDF("Amt")))

我收到类型不匹配错误

found   : org.apache.spark.sql.Column
required: Integer

我尝试将函数的输入类型更改为 org.apache.spark.sql.Column,但随后我开始在函数编译时遇到错误,因为它需要 if 语句中的布尔值。

我做错了吗?除了使用 withColumn 之外,还有 better/another 方法吗?

感谢您的帮助。

假设您的架构中有 "Amt" 列:

import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))

我认为 withColumn 是添加列的正确方法

由于 serializationdeserialization 列的开销,我们应该尽可能避免定义 udf 函数。

您可以使用简单的 when spark 函数实现解决方案,如下所示

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))

另一种方法: 您可以创建任何函数,但根据上述错误,您应该将函数定义为变量

示例:

val coder = udf((myAmt:Integer) => {
  if (myAmt > 100) "Little"
  else "Big"
})

现在这条语句完美运行:

myDF.withColumn("Code", coder(myDF("Amt")))