Spark Scala 数据帧转换
Spark Scala Dataframe Conversion
我有一个用 ~ 分隔的文本文件,我需要在转换为数据帧之前执行一些解析。当 RDD[String] 进行一些解析时,代码读入一个文本文件。然后,它转换为 RDD[Row]。然后使用模式创建一个数据框。
下面是我的代码。它有效,但问题是实际模式有 400 个字段长。我想知道是否有比键入 attributes(1)、attributes(2)、attributes(3)... 等更简单的方法。
我目前使用的是 Spark 1.6。 CDH 5.2.2
示例输入:
20161481132310 ~ ~"This" is a comma 10
当前代码:
val schema_1 = StructType(Array(
StructField("EXAMPLE_1", StringType, true),
StructField("EXAMPLE_2", StringType, true),
StructField("EXAMPLE_3", StringType, true)))
val rdd = sc.textFile("example.txt")
val rdd_truncate = rdd.map(_.split("~").map(_.trim).mkString("~"))
val row_final = rdd_truncate
.map(_.split("~"))
.map(attributes => Row(attributes(0),
attributes(1),
attributes(2)))
val df = sqlContext.createDataFrame(row_final, schema_1)
根据建议修改如下。它适用于引号。输入中的 "This" 将失败。有什么建议吗?
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("delimiter","~")
.schema(schema)
.load("example.txt")
val df_final = df.select(df.columns.map(c =>trim(col(c)).alias(c)): _*)
只需使用标准 CSV reader:
spark.read.schema(schema).option("delimiter", "~").csv("example.txt")
如果你想 trim 字段只需使用 select
:
import org.apache.spark.sql.functions.{col, trim}
df.select(df.columns.map(c => trim(col(c)).alias(c)): _*)
如果你使用 Spark 1.x 你可以使用 spark-csv
:
sqlContext.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("delimiter", "~")
.load("example.txt")
如果由于某种原因这还不够,您可以使用 Row.fromSeq
:
Row.fromSeq(line.split("~").take(3))
我有一个用 ~ 分隔的文本文件,我需要在转换为数据帧之前执行一些解析。当 RDD[String] 进行一些解析时,代码读入一个文本文件。然后,它转换为 RDD[Row]。然后使用模式创建一个数据框。
下面是我的代码。它有效,但问题是实际模式有 400 个字段长。我想知道是否有比键入 attributes(1)、attributes(2)、attributes(3)... 等更简单的方法。
我目前使用的是 Spark 1.6。 CDH 5.2.2
示例输入:
20161481132310 ~ ~"This" is a comma 10
当前代码:
val schema_1 = StructType(Array(
StructField("EXAMPLE_1", StringType, true),
StructField("EXAMPLE_2", StringType, true),
StructField("EXAMPLE_3", StringType, true)))
val rdd = sc.textFile("example.txt")
val rdd_truncate = rdd.map(_.split("~").map(_.trim).mkString("~"))
val row_final = rdd_truncate
.map(_.split("~"))
.map(attributes => Row(attributes(0),
attributes(1),
attributes(2)))
val df = sqlContext.createDataFrame(row_final, schema_1)
根据建议修改如下。它适用于引号。输入中的 "This" 将失败。有什么建议吗?
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("delimiter","~")
.schema(schema)
.load("example.txt")
val df_final = df.select(df.columns.map(c =>trim(col(c)).alias(c)): _*)
只需使用标准 CSV reader:
spark.read.schema(schema).option("delimiter", "~").csv("example.txt")
如果你想 trim 字段只需使用 select
:
import org.apache.spark.sql.functions.{col, trim}
df.select(df.columns.map(c => trim(col(c)).alias(c)): _*)
如果你使用 Spark 1.x 你可以使用 spark-csv
:
sqlContext.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("delimiter", "~")
.load("example.txt")
如果由于某种原因这还不够,您可以使用 Row.fromSeq
:
Row.fromSeq(line.split("~").take(3))