在 Spark Dataframe 中创建具有函数的新列
Create new column with function in Spark Dataframe
我正在尝试找出 Spark 中的新数据框 API。似乎向前迈出了一大步,但在做一些本应非常简单的事情时却遇到了麻烦。我有一个包含 2 列的数据框,“ID”和“金额”。作为一个通用示例,假设我想要 return 一个名为“代码”的新列,该列 return 是一个基于“Amt”值的代码。我可以编写这样的函数:
def coder(myAmt:Integer):String {
if (myAmt > 100) "Little"
else "Big"
}
当我尝试这样使用它时:
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", coder(myDF("Amt")))
我收到类型不匹配错误
found : org.apache.spark.sql.Column
required: Integer
我尝试将函数的输入类型更改为 org.apache.spark.sql.Column,但随后我开始在函数编译时遇到错误,因为它需要 if 语句中的布尔值。
我做错了吗?除了使用 withColumn 之外,还有 better/another 方法吗?
感谢您的帮助。
假设您的架构中有 "Amt" 列:
import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))
我认为 withColumn 是添加列的正确方法
由于 serialization
和 deserialization
列的开销,我们应该尽可能避免定义 udf
函数。
您可以使用简单的 when
spark 函数实现解决方案,如下所示
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))
另一种方法:
您可以创建任何函数,但根据上述错误,您应该将函数定义为变量
示例:
val coder = udf((myAmt:Integer) => {
if (myAmt > 100) "Little"
else "Big"
})
现在这条语句完美运行:
myDF.withColumn("Code", coder(myDF("Amt")))
我正在尝试找出 Spark 中的新数据框 API。似乎向前迈出了一大步,但在做一些本应非常简单的事情时却遇到了麻烦。我有一个包含 2 列的数据框,“ID”和“金额”。作为一个通用示例,假设我想要 return 一个名为“代码”的新列,该列 return 是一个基于“Amt”值的代码。我可以编写这样的函数:
def coder(myAmt:Integer):String {
if (myAmt > 100) "Little"
else "Big"
}
当我尝试这样使用它时:
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", coder(myDF("Amt")))
我收到类型不匹配错误
found : org.apache.spark.sql.Column
required: Integer
我尝试将函数的输入类型更改为 org.apache.spark.sql.Column,但随后我开始在函数编译时遇到错误,因为它需要 if 语句中的布尔值。
我做错了吗?除了使用 withColumn 之外,还有 better/another 方法吗?
感谢您的帮助。
假设您的架构中有 "Amt" 列:
import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))
我认为 withColumn 是添加列的正确方法
由于 serialization
和 deserialization
列的开销,我们应该尽可能避免定义 udf
函数。
您可以使用简单的 when
spark 函数实现解决方案,如下所示
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))
另一种方法: 您可以创建任何函数,但根据上述错误,您应该将函数定义为变量
示例:
val coder = udf((myAmt:Integer) => {
if (myAmt > 100) "Little"
else "Big"
})
现在这条语句完美运行:
myDF.withColumn("Code", coder(myDF("Amt")))