RDD 中的元组数量限制;读取 RDD 抛出 arrayIndexOutOfBoundsException
number of tuples limit in RDD; reading RDD throws arrayIndexOutOfBoundsException
我尝试将包含 25 列的 table 的 DF 修改为 RDD。此后我开始知道 Scala(直到 2.11.8)有最多可以使用 22 个元组的限制。
val rdd = sc.textFile("/user/hive/warehouse/myDB.db/myTable/")
rdd: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/myDB.db/myTable/ MapPartitionsRDD[3] at textFile at <console>:24
示例数据:
[2017-02-26, 100052-ACC, 100052, 3260, 1005, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000]
访问每一列:
val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(el(0).substring(1,11).toString, el(1).toString ,el(2).toInt, el(3).toInt, el(4).toInt, el(5).sum.toDouble, el(6).sum.toDouble, el(7).sum.toDouble, el(8).sum.toDouble, el(9).sum.toDouble, el(10).sum.toDouble, el(11).sum.toDouble, el(12).sum.toDouble, el(13).sum.toDouble, el(14).sum.toDouble, el(15).sum.toDouble, el(15).sum.toDouble, el(17).sum.toDouble, el(18).sum.toDouble, el(19).sum.toDouble, el(20).sum.toDouble, el(21).sum.toDouble, el(22).sum.toDouble, el(23).sum.toDouble, el(24).sum.toDouble)
}
)
它抛出一个错误:
<console>:1: error: too many elements for tuple: 26, allowed: 22
这是 Scala 中的一个错误 (https://issues.scala-lang.org/browse/SI-9572)。所以我创建了一个案例 class 来继续解决这个问题。
case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)
因此新的rdd定义变成:
val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(HandleMaxTuple(el(0).substring(1,11).toString, el(1).toString,el(2).toInt, el(3).toInt, el(4).toInt, el(5).toDouble, el(6).toDouble, el(7).toDouble, el(8).toDouble, el(9).toDouble, el(10).toDouble, el(11).toDouble, el(12).toDouble, el(13).toDouble, el(14).toDouble, el(15).toDouble, el(15).toDouble, el(17).toDouble, el(18).toDouble, el(19).toDouble, el(20).toDouble, el(21).toDouble, el(22).toDouble, el(23).toDouble, el(24).toDouble))
}
)
但是,当我尝试读取 RDD 的内容时:
rdd.take(2).foreach(println)
它抛出一个异常 java.lang.ArrayIndexOutOfBoundsException:
错误堆栈:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.rdd.RDD$$anonfun$take.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
... 48 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
知道为什么会这样吗?有什么解决方法吗?
我尝试使用案例 class 根据您的数据做完全相同的事情,但我发现了两个问题。先看答案:
package com.scalaspark.Whosebug
import org.apache.spark.sql.SparkSession
object Whosebug {
def main(args: Array[String]): Unit = {
def parser(lines : String): HandleMaxTuple = {
val fileds = lines.split(",")
val c1 = fileds(0).substring(1,10).toString()
val c2 = fileds(1).toString()
val c3 = fileds(2).replaceAll("\s","").toInt
val c4 = fileds(3).replaceAll("\s","").toInt
val c5 = fileds(4).replaceAll("\s","").toInt
val c6 = fileds(5).replaceAll("\s","").toDouble
val c7 = fileds(6).replaceAll("\s","").toDouble
val c8 = fileds(7).replaceAll("\s","").toDouble
val c9 = fileds(8).replaceAll("\s","").toDouble
val c10 = fileds(9).replaceAll("\s","").toDouble
val c11 = fileds(10).replaceAll("\s","").toDouble
val c12 = fileds(11).replaceAll("\s","").toDouble
val c13 = fileds(12).replaceAll("\s","").toDouble
val c14 = fileds(13).replaceAll("\s","").toDouble
val c15 = fileds(14).replaceAll("\s","").toDouble
val c16 = fileds(15).replaceAll("\s","").toDouble
val c17 = fileds(16).replaceAll("\s","").toDouble
val c18 = fileds(17).replaceAll("\s","").toDouble
val c19 = fileds(18).replaceAll("\s","").toDouble
val c20 = fileds(19).replaceAll("\s","").toDouble
val c21 = fileds(20).replaceAll("\s","").toDouble
val c22 = fileds(21).replaceAll("\s","").toDouble
val c23 = fileds(22).replaceAll("\s","").toDouble
val c24 = fileds(23).replaceAll("\s","").toDouble
val c25 = fileds(24).replaceAll("\s","").toDouble
val handleMaxTuple : HandleMaxTuple = HandleMaxTuple(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25)
return handleMaxTuple
}
val spark = SparkSession
.builder()
.appName("number of tuples limit in RDD")
.master("local[*]")
.getOrCreate()
val lines = spark.sparkContext.textFile("C:\Users\rajnish.kumar\Desktop\sampleData.txt", 1)
lines.foreach(println)
val parseddata = lines.map(parser)
parseddata.foreach(println)
}
case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)
}
第一个问题 是 el(0)
你使用的 substring()
根据 Java 文档应该是:
String substring(int beginIndex, int endIndex)
Returns a new string that is a substring of this string.
当我使用 el(0).substring(1,11)
时,我得到 java.lang.StringIndexOutOfBoundsException: String index out of range: 11
。
所以选择 el(0).substring(0,10)
(因为索引从零开始而不是从 1)。
第二个问题您正在使用 toInt 和 doubles 进行某些字段转换,但正如我所看到的,它们都包含一个 space 开始,所以,请注意这个可以像在 Java 中一样以 NumberFormatException
失败,像这样:
scala> val i = "foo".toInt
java.lang.NumberFormatException: For input string: "foo"
有关详细信息,请访问 https://alvinalexander.com/scala/how-cast-string-to-int-in-scala-string-int-conversion。因此,为了更正它,我使用了 .replaceAll("\s","")
,它删除了数字之前的所有 space,然后将它们转换为 int 和 double。
当您 运行 以上示例时,您将得到输出:
HandleMaxTuple(2017-02-26, 100052-ACC,100052,3260,1005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0)
我尝试将包含 25 列的 table 的 DF 修改为 RDD。此后我开始知道 Scala(直到 2.11.8)有最多可以使用 22 个元组的限制。
val rdd = sc.textFile("/user/hive/warehouse/myDB.db/myTable/")
rdd: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/myDB.db/myTable/ MapPartitionsRDD[3] at textFile at <console>:24
示例数据:
[2017-02-26, 100052-ACC, 100052, 3260, 1005, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000]
访问每一列:
val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(el(0).substring(1,11).toString, el(1).toString ,el(2).toInt, el(3).toInt, el(4).toInt, el(5).sum.toDouble, el(6).sum.toDouble, el(7).sum.toDouble, el(8).sum.toDouble, el(9).sum.toDouble, el(10).sum.toDouble, el(11).sum.toDouble, el(12).sum.toDouble, el(13).sum.toDouble, el(14).sum.toDouble, el(15).sum.toDouble, el(15).sum.toDouble, el(17).sum.toDouble, el(18).sum.toDouble, el(19).sum.toDouble, el(20).sum.toDouble, el(21).sum.toDouble, el(22).sum.toDouble, el(23).sum.toDouble, el(24).sum.toDouble)
}
)
它抛出一个错误:
<console>:1: error: too many elements for tuple: 26, allowed: 22
这是 Scala 中的一个错误 (https://issues.scala-lang.org/browse/SI-9572)。所以我创建了一个案例 class 来继续解决这个问题。
case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)
因此新的rdd定义变成:
val rdd3 = rdd.map(elements => {
val el = elements.split(",")
(HandleMaxTuple(el(0).substring(1,11).toString, el(1).toString,el(2).toInt, el(3).toInt, el(4).toInt, el(5).toDouble, el(6).toDouble, el(7).toDouble, el(8).toDouble, el(9).toDouble, el(10).toDouble, el(11).toDouble, el(12).toDouble, el(13).toDouble, el(14).toDouble, el(15).toDouble, el(15).toDouble, el(17).toDouble, el(18).toDouble, el(19).toDouble, el(20).toDouble, el(21).toDouble, el(22).toDouble, el(23).toDouble, el(24).toDouble))
}
)
但是,当我尝试读取 RDD 的内容时:
rdd.take(2).foreach(println)
它抛出一个异常 java.lang.ArrayIndexOutOfBoundsException:
错误堆栈:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.rdd.RDD$$anonfun$take.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
... 48 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
知道为什么会这样吗?有什么解决方法吗?
我尝试使用案例 class 根据您的数据做完全相同的事情,但我发现了两个问题。先看答案:
package com.scalaspark.Whosebug
import org.apache.spark.sql.SparkSession
object Whosebug {
def main(args: Array[String]): Unit = {
def parser(lines : String): HandleMaxTuple = {
val fileds = lines.split(",")
val c1 = fileds(0).substring(1,10).toString()
val c2 = fileds(1).toString()
val c3 = fileds(2).replaceAll("\s","").toInt
val c4 = fileds(3).replaceAll("\s","").toInt
val c5 = fileds(4).replaceAll("\s","").toInt
val c6 = fileds(5).replaceAll("\s","").toDouble
val c7 = fileds(6).replaceAll("\s","").toDouble
val c8 = fileds(7).replaceAll("\s","").toDouble
val c9 = fileds(8).replaceAll("\s","").toDouble
val c10 = fileds(9).replaceAll("\s","").toDouble
val c11 = fileds(10).replaceAll("\s","").toDouble
val c12 = fileds(11).replaceAll("\s","").toDouble
val c13 = fileds(12).replaceAll("\s","").toDouble
val c14 = fileds(13).replaceAll("\s","").toDouble
val c15 = fileds(14).replaceAll("\s","").toDouble
val c16 = fileds(15).replaceAll("\s","").toDouble
val c17 = fileds(16).replaceAll("\s","").toDouble
val c18 = fileds(17).replaceAll("\s","").toDouble
val c19 = fileds(18).replaceAll("\s","").toDouble
val c20 = fileds(19).replaceAll("\s","").toDouble
val c21 = fileds(20).replaceAll("\s","").toDouble
val c22 = fileds(21).replaceAll("\s","").toDouble
val c23 = fileds(22).replaceAll("\s","").toDouble
val c24 = fileds(23).replaceAll("\s","").toDouble
val c25 = fileds(24).replaceAll("\s","").toDouble
val handleMaxTuple : HandleMaxTuple = HandleMaxTuple(c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25)
return handleMaxTuple
}
val spark = SparkSession
.builder()
.appName("number of tuples limit in RDD")
.master("local[*]")
.getOrCreate()
val lines = spark.sparkContext.textFile("C:\Users\rajnish.kumar\Desktop\sampleData.txt", 1)
lines.foreach(println)
val parseddata = lines.map(parser)
parseddata.foreach(println)
}
case class HandleMaxTuple(col1:String, col2:String, col3: Int, col4: Int, col5: Int, col6: Double, col7: Double, col8: Double, col9: Double, col10: Double, col11: Double, col12: Double, col13: Double, col14: Double, col15: Double, col16: Double, col17: Double, col18: Double, col19: Double, col20: Double, col21: Double, col22: Double, col23: Double, col24: Double, col25:Double)
}
第一个问题 是 el(0)
你使用的 substring()
根据 Java 文档应该是:
String substring(int beginIndex, int endIndex)
Returns a new string that is a substring of this string.
当我使用 el(0).substring(1,11)
时,我得到 java.lang.StringIndexOutOfBoundsException: String index out of range: 11
。
所以选择 el(0).substring(0,10)
(因为索引从零开始而不是从 1)。
第二个问题您正在使用 toInt 和 doubles 进行某些字段转换,但正如我所看到的,它们都包含一个 space 开始,所以,请注意这个可以像在 Java 中一样以 NumberFormatException
失败,像这样:
scala> val i = "foo".toInt
java.lang.NumberFormatException: For input string: "foo"
有关详细信息,请访问 https://alvinalexander.com/scala/how-cast-string-to-int-in-scala-string-int-conversion。因此,为了更正它,我使用了 .replaceAll("\s","")
,它删除了数字之前的所有 space,然后将它们转换为 int 和 double。
当您 运行 以上示例时,您将得到输出:
HandleMaxTuple(2017-02-26, 100052-ACC,100052,3260,1005,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0)