在 Spark 中向 DataFrame 添加新列

Additing a new column to DataFrame in Spark

我想向 Spark(Scala) 中的 DataFrame 添加一个带有行 ID 的新列。这是我采取的方法。我正在创建一个带有索引 ID 的新行和一个包含另一个 StructField 的新 StructType

 val rdd = df.rdd.zipWithIndex().map(indexedRow => Row.fromSeq(indexedRow._2.toString ++ indexedRow._1.toSeq ))
 val list = StructType(Seq(StructField("Row Number", StringType, true)).++(df.schema.fields))
 sqlContext.createDataFrame(rdd, list).show() // fails

我在 运行.

时遇到以下异常
scala.MatchError: 0 (of class java.lang.Character)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter.apply(CatalystTypeConverters.scala:401)
    at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:492)
    at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:492)

但 structtype 和 rdd 具有预期的输出。 谁能帮我这个?请

我试过 Spark2.10 1.6.0 和 1.6.1 版本

您只有一个小错误,那就是将字符串值添加到字段序列中 - 而不是:

indexedRow._2.toString ++ indexedRow._1.toSeq

你应该使用:

indexedRow._2.toString +: indexedRow._1.toSeq

第一个实现实际上将字符串转换为 Seq[Char] 然后 连接 这两个序列,所以你最终得到的是 Seq('1', '2', "f1Val", "f2Val") 而不是Seq("12", "f1Val", "f2Val")。您看到的异常是 Spark 试图将第一个 Char 解析为 StringType 但失败了。

查看 this 答案以获得为 rdd 行分配唯一 ID 的更好方法(RDD.zipWithUniqueId)