为什么读取具有空值的 csv 文件会导致 IndexOutOfBoundException?

Why does reading csv file with empty values lead to IndexOutOfBoundException?

我有一个包含以下结构的 csv 文件

Name | Val1 | Val2 | Val3 | Val4 | Val5
John     1      2
Joe      1      2
David    1      2            10    11

我可以将其加载到 RDD 中。我尝试创建一个模式,然后从中创建一个 Dataframe 并得到一个 indexOutOfBound 错误。

代码是这样的...

val rowRDD = fileRDD.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), p(6) )

当我尝试对 rowRDD 执行操作时,出现错误。

非常感谢任何帮助。

这不是您问题的答案。但它可能有助于解决您的问题。

从问题中我看到您正在尝试从 CSV 创建数据框。

使用 spark-csv

可以轻松地使用 CSV 创建数据框

使用scala代码下面的spark-csv可以用来读取CSV val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(csvFilePath)

对于您的示例数据,我得到了以下结果

+-----+----+----+----+----+----+
| Name|Val1|Val2|Val3|Val4|Val5|
+-----+----+----+----+----+----+
| John|   1|   2|    |    |    |
|  Joe|   1|   2|    |    |    |
|David|   1|   2|    |  10|  11|
+-----+----+----+----+----+----+

您还可以使用最新版本推断架构。看到这个

如果 CSV 文件包含固定数量的列并且您的 CVS 看起来像这样(请注意用自己的逗号分隔的空字段),则空值不是问题:

David,1,2,10,,11

问题是您的 CSV 文件包含 6 列,但是:

val rowRDD = fileRDD.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), p(6) )

您尝试阅读 7 列。只需将您的映射更改为:

val rowRDD = fileRDD.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5))

剩下的交给 Spark 处理。

该问题的可能解决方案是用 Double.NaN 替换缺失值。假设我有一个文件 example.csv,其中包含列

David,1,2,10,,11

您可以将 csv 文件作为文本文件阅读,如下所示

fileRDD=sc.textFile(example.csv).map(x=> {val y=x.split(","); val z=y.map(k=> if(k==""){Double.NaN}else{k.toDouble()})})

然后您可以使用您的代码从中创建数据框

您可以按如下方式进行。

val df = sqlContext
         .read
         .textfile(csvFilePath)
         .map(_.split(delimiter_of_file, -1)
         .map(
             p => 
              Row(
                p(0), 
                p(1),
                p(2),
                p(3),
                p(4),
                p(5),
                p(6))

使用文件的分隔符拆分。当您将 -1 设置为限制时,它会考虑所有空字段。