Spark UDF 没有将列值从 null 更改为 0
The Spark UDF is not changing the column value from null to 0
正在尝试使用下面的 UDF 将 Dataframe 中的 null 替换为 0。
我可能出错的地方,代码看起来很简单,但没有按预期工作。
我试图创建一个 UDF,它在任何值为 null 的列中替换 0。
提前谢谢大家。
//imports
object PlayGround {
def missingValType2(n: Int):Int = {
if(n == null){
0
}else{
n
}
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("PlayGround")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.master("local[*]")
.getOrCreate()
val missingValUDFType2 = udf[Int, Int](missingValType2)
val schema = List(
StructField("name", types.StringType, false),
StructField("age", types.IntegerType, true)
)
val data = Seq(
Row("miguel", null),
Row("luisa", 21)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(schema)
)
df.show(false)
df.withColumn("ageNullReplace",missingValUDFType2($"age")).show()
}
}
/**
* +------+----+
* |name |age |
* +------+----+
* |miguel|null|
* |luisa |21 |
* +------+----+
*
* Below is the current output.
* +------+----+--------------+
* | name| age|ageNullReplace|
* +------+----+--------------+
* |miguel|null| null|
* | luisa| 21| 21|
* +------+----+--------------+*/
预期输出:
* +------+----+--------------+
* | name| age|ageNullReplace|
* +------+----+--------------+
* |miguel|null| 0|
* | luisa| 21| 21|
* +------+----+--------------+
不需要UDF。您可以将 na.fill
应用于 DataFrame 中 type-specific 列的列表,如下所示:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("miguel", None), ("luisa", Some(21))
).toDF("name", "age")
df.na.fill(0, Seq("age")).show
// +------+---+
// | name|age|
// +------+---+
// |miguel| 0|
// | luisa| 21|
// +------+---+
您可以像下面这样在 when 条件下使用 WithColumn
代码未经测试
df.withColumn("ageNullReplace", when(col("age").isNull,lit(0)).otherwise(col(age)))
在上面的代码中,否则不需要仅供参考
希望对您有所帮助
正在尝试使用下面的 UDF 将 Dataframe 中的 null 替换为 0。 我可能出错的地方,代码看起来很简单,但没有按预期工作。
我试图创建一个 UDF,它在任何值为 null 的列中替换 0。
提前谢谢大家。
//imports
object PlayGround {
def missingValType2(n: Int):Int = {
if(n == null){
0
}else{
n
}
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("PlayGround")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.master("local[*]")
.getOrCreate()
val missingValUDFType2 = udf[Int, Int](missingValType2)
val schema = List(
StructField("name", types.StringType, false),
StructField("age", types.IntegerType, true)
)
val data = Seq(
Row("miguel", null),
Row("luisa", 21)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(schema)
)
df.show(false)
df.withColumn("ageNullReplace",missingValUDFType2($"age")).show()
}
}
/**
* +------+----+
* |name |age |
* +------+----+
* |miguel|null|
* |luisa |21 |
* +------+----+
*
* Below is the current output.
* +------+----+--------------+
* | name| age|ageNullReplace|
* +------+----+--------------+
* |miguel|null| null|
* | luisa| 21| 21|
* +------+----+--------------+*/
预期输出:
* +------+----+--------------+
* | name| age|ageNullReplace|
* +------+----+--------------+
* |miguel|null| 0|
* | luisa| 21| 21|
* +------+----+--------------+
不需要UDF。您可以将 na.fill
应用于 DataFrame 中 type-specific 列的列表,如下所示:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("miguel", None), ("luisa", Some(21))
).toDF("name", "age")
df.na.fill(0, Seq("age")).show
// +------+---+
// | name|age|
// +------+---+
// |miguel| 0|
// | luisa| 21|
// +------+---+
您可以像下面这样在 when 条件下使用 WithColumn 代码未经测试
df.withColumn("ageNullReplace", when(col("age").isNull,lit(0)).otherwise(col(age)))
在上面的代码中,否则不需要仅供参考
希望对您有所帮助