数据框到 RDD[Row] 用空值替换 space
Dataframe to RDD[Row] replacing space with nulls
我正在将 Spark 数据帧转换为 RDD[Row],这样我就可以将其映射到最终模式以写入 Hive Orc table。我想将输入中的任何 space 转换为实际的 null
,以便配置单元 table 可以存储实际的 null
而不是空字符串。
输入 DataFrame(单列,管道分隔值):
col1
1|2|3||5|6|7|||...|
我的代码:
inputDF.rdd.
map { x: Row => x.get(0).asInstanceOf[String].split("\|", -1)}.
map { x => Row (nullConverter(x(0)),nullConverter(x(1)),nullConverter(x(2)).... nullConverter(x(200)))}
def nullConverter(input: String): String = {
if (input.trim.length > 0) input.trim
else null
}
有什么干净的方法可以做到这一点而不是调用 nullConverter
函数 200 次。
基于单列的更新:
按照你的方法,我会做类似的事情:
inputDf.rdd.map((row: Row) => {
val values = row.get(0).asInstanceOf[String].split("\|").map(nullConverter)
Row(values)
})
使您的 nullConverter 或任何其他逻辑成为 udf:
import org.apache.spark.sql.functions._
val nullConverter = udf((input: String) => {
if (input.trim.length > 0) input.trim
else null
})
现在,在你的 df 上使用 udf 并应用到所有列:
val convertedDf = inputDf.select(inputDf.columns.map(c => nullConverter(col(c)).alias(c)):_*)
现在,您可以执行 RDD 逻辑了。
在转换为 RDD 之前,使用 DataFrame API 会更容易做到这一点。一、拆分数据:
val df = Seq(("1|2|3||5|6|7|8||")).toDF("col0") // Example dataframe
val df2 = df.withColumn("col0", split($"col0", "\|")) // Split on "|"
然后求出数组的长度:
val numCols = df2.first.getAs[Seq[String]](0).length
现在,对于数组中的每个元素,使用 nullConverter UDF
然后将其分配给它自己的列。
val nullConverter = udf((input: String) => {
if (input.trim.length > 0) input.trim
else null
})
val df3 = df2.select((0 until numCols).map(i => nullConverter($"col0".getItem(i)).as("col" + i)): _*)
使用示例数据框的结果:
+----+----+----+----+----+----+----+----+----+----+
|col0|col1|col2|col3|col4|col5|col6|col7|col8|col9|
+----+----+----+----+----+----+----+----+----+----+
| 1| 2| 3|null| 5| 6| 7| 8|null|null|
+----+----+----+----+----+----+----+----+----+----+
现在根据您的需要将其转换为 RDD 或继续将数据用作 DataFrame。
将dataframe转换为rdd没有意义
import org.apache.spark.sql.functions._
df = sc.parallelize([
(1, "foo bar"), (2, "foobar "), (3, " ")
]).toDF(["k", "v"])
df.select(regexp_replace(col("*"), " ", "NULL"))
我正在将 Spark 数据帧转换为 RDD[Row],这样我就可以将其映射到最终模式以写入 Hive Orc table。我想将输入中的任何 space 转换为实际的 null
,以便配置单元 table 可以存储实际的 null
而不是空字符串。
输入 DataFrame(单列,管道分隔值):
col1
1|2|3||5|6|7|||...|
我的代码:
inputDF.rdd.
map { x: Row => x.get(0).asInstanceOf[String].split("\|", -1)}.
map { x => Row (nullConverter(x(0)),nullConverter(x(1)),nullConverter(x(2)).... nullConverter(x(200)))}
def nullConverter(input: String): String = {
if (input.trim.length > 0) input.trim
else null
}
有什么干净的方法可以做到这一点而不是调用 nullConverter
函数 200 次。
基于单列的更新:
按照你的方法,我会做类似的事情:
inputDf.rdd.map((row: Row) => {
val values = row.get(0).asInstanceOf[String].split("\|").map(nullConverter)
Row(values)
})
使您的 nullConverter 或任何其他逻辑成为 udf:
import org.apache.spark.sql.functions._
val nullConverter = udf((input: String) => {
if (input.trim.length > 0) input.trim
else null
})
现在,在你的 df 上使用 udf 并应用到所有列:
val convertedDf = inputDf.select(inputDf.columns.map(c => nullConverter(col(c)).alias(c)):_*)
现在,您可以执行 RDD 逻辑了。
在转换为 RDD 之前,使用 DataFrame API 会更容易做到这一点。一、拆分数据:
val df = Seq(("1|2|3||5|6|7|8||")).toDF("col0") // Example dataframe
val df2 = df.withColumn("col0", split($"col0", "\|")) // Split on "|"
然后求出数组的长度:
val numCols = df2.first.getAs[Seq[String]](0).length
现在,对于数组中的每个元素,使用 nullConverter UDF
然后将其分配给它自己的列。
val nullConverter = udf((input: String) => {
if (input.trim.length > 0) input.trim
else null
})
val df3 = df2.select((0 until numCols).map(i => nullConverter($"col0".getItem(i)).as("col" + i)): _*)
使用示例数据框的结果:
+----+----+----+----+----+----+----+----+----+----+
|col0|col1|col2|col3|col4|col5|col6|col7|col8|col9|
+----+----+----+----+----+----+----+----+----+----+
| 1| 2| 3|null| 5| 6| 7| 8|null|null|
+----+----+----+----+----+----+----+----+----+----+
现在根据您的需要将其转换为 RDD 或继续将数据用作 DataFrame。
将dataframe转换为rdd没有意义
import org.apache.spark.sql.functions._
df = sc.parallelize([
(1, "foo bar"), (2, "foobar "), (3, " ")
]).toDF(["k", "v"])
df.select(regexp_replace(col("*"), " ", "NULL"))