在 Spark 中向 DataFrame 添加新列
Additing a new column to DataFrame in Spark
我想向 Spark(Scala)
中的 DataFrame
添加一个带有行 ID 的新列。这是我采取的方法。我正在创建一个带有索引 ID 的新行和一个包含另一个 StructField
的新 StructType
。
val rdd = df.rdd.zipWithIndex().map(indexedRow => Row.fromSeq(indexedRow._2.toString ++ indexedRow._1.toSeq ))
val list = StructType(Seq(StructField("Row Number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, list).show() // fails
我在 运行.
时遇到以下异常
scala.MatchError: 0 (of class java.lang.Character)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter.apply(CatalystTypeConverters.scala:401)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:492)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:492)
但 structtype 和 rdd 具有预期的输出。
谁能帮我这个?请
我试过 Spark2.10 1.6.0 和 1.6.1 版本
您只有一个小错误,那就是将字符串值添加到字段序列中 - 而不是:
indexedRow._2.toString ++ indexedRow._1.toSeq
你应该使用:
indexedRow._2.toString +: indexedRow._1.toSeq
第一个实现实际上将字符串转换为 Seq[Char]
然后 连接 这两个序列,所以你最终得到的是 Seq('1', '2', "f1Val", "f2Val")
而不是Seq("12", "f1Val", "f2Val")
。您看到的异常是 Spark 试图将第一个 Char
解析为 StringType
但失败了。
查看 this 答案以获得为 rdd 行分配唯一 ID 的更好方法(RDD.zipWithUniqueId)
我想向 Spark(Scala)
中的 DataFrame
添加一个带有行 ID 的新列。这是我采取的方法。我正在创建一个带有索引 ID 的新行和一个包含另一个 StructField
的新 StructType
。
val rdd = df.rdd.zipWithIndex().map(indexedRow => Row.fromSeq(indexedRow._2.toString ++ indexedRow._1.toSeq ))
val list = StructType(Seq(StructField("Row Number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, list).show() // fails
我在 运行.
时遇到以下异常scala.MatchError: 0 (of class java.lang.Character)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter.apply(CatalystTypeConverters.scala:401)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:492)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:492)
但 structtype 和 rdd 具有预期的输出。 谁能帮我这个?请
我试过 Spark2.10 1.6.0 和 1.6.1 版本
您只有一个小错误,那就是将字符串值添加到字段序列中 - 而不是:
indexedRow._2.toString ++ indexedRow._1.toSeq
你应该使用:
indexedRow._2.toString +: indexedRow._1.toSeq
第一个实现实际上将字符串转换为 Seq[Char]
然后 连接 这两个序列,所以你最终得到的是 Seq('1', '2', "f1Val", "f2Val")
而不是Seq("12", "f1Val", "f2Val")
。您看到的异常是 Spark 试图将第一个 Char
解析为 StringType
但失败了。
查看 this 答案以获得为 rdd 行分配唯一 ID 的更好方法(RDD.zipWithUniqueId)