Spark Scala 数据帧转换

Spark Scala Dataframe Conversion

我有一个用 ~ 分隔的文本文件,我需要在转换为数据帧之前执行一些解析。当 RDD[String] 进行一些解析时,代码读入一个文本文件。然后,它转换为 RDD[Row]。然后使用模式创建一个数据框。

下面是我的代码。它有效,但问题是实际模式有 400 个字段长。我想知道是否有比键入 attributes(1)、attributes(2)、attributes(3)... 等更简单的方法。

我目前使用的是 Spark 1.6。 CDH 5.2.2

示例输入:

20161481132310 ~     ~"This" is a comma 10

当前代码:

val schema_1 = StructType(Array(
StructField("EXAMPLE_1", StringType, true),
StructField("EXAMPLE_2", StringType, true),
StructField("EXAMPLE_3", StringType, true)))

val rdd = sc.textFile("example.txt")
val rdd_truncate =  rdd.map(_.split("~").map(_.trim).mkString("~"))
val row_final = rdd_truncate
  .map(_.split("~"))
  .map(attributes => Row(attributes(0),
    attributes(1),
    attributes(2)))

val df = sqlContext.createDataFrame(row_final, schema_1)

根据建议修改如下。它适用于引号。输入中的 "This" 将失败。有什么建议吗?

val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("delimiter","~")
      .schema(schema)
      .load("example.txt")
val df_final = df.select(df.columns.map(c =>trim(col(c)).alias(c)): _*)

只需使用标准 CSV reader:

spark.read.schema(schema).option("delimiter", "~").csv("example.txt")

如果你想 trim 字段只需使用 select:

import org.apache.spark.sql.functions.{col, trim}

df.select(df.columns.map(c => trim(col(c)).alias(c)): _*)

如果你使用 Spark 1.x 你可以使用 spark-csv:

sqlContext.read
  .format("com.databricks.spark.csv")
  .schema(schema)
  .option("delimiter", "~")
  .load("example.txt")

如果由于某种原因这还不够,您可以使用 Row.fromSeq:

Row.fromSeq(line.split("~").take(3))