NULL 指针异常,同时在 foreach() 中创建 DF
NULL Pointer Exception, while creating DF inside foreach()
我必须从 S3 读取某些文件,因此我创建了一个包含这些文件在 S3 上的路径的 CSV。我正在使用以下代码读取创建的 CSV 文件:
val listofFilesRDD = sparkSession.read.textFile("s3://"+ file)
这工作正常。
然后我试图读取每条路径并创建数据框,如:
listofFilesRDD.foreach(iter => {
val pathDF = sparkSession.read
.schema(testSchema)
.option("headers", true)
.csv("s3://"+iter)
pathDF.printSchema()
})
但是,上面的代码给出了 NullPointerException。
那么,我该如何修复上面的代码?
您不能访问 RDD 内部的 RDD!这是唯一的规则!您必须做些其他事情才能使您的逻辑正常工作!
您可以在这里找到更多相关信息:NullPointerException in Scala Spark, appears to be caused be collection type?
您可以解决上述问题,如下所示,您只需创建 s3 文件路径数组并遍历该数组并在其中创建 DF,如下所示
val listofFilesRDD = sparkSession.read.textFile("s3://"+ file)
val listOfPaths = listofFilesRDD.collect()
listOfPaths.foreach(iter => {
val pathDF = sparkSession.read
.schema(testSchema)
.option("headers", true)
.csv("s3://"+iter)
pathDF.printSchema()
})
如果有人遇到DataFrame问题,可以解决这个问题。
def parameterjsonParser(queryDF:DataFrame,spark:SparkSession): Unit ={
queryDF.show()
val otherDF=queryDF.collect()
otherDF.foreach { row =>
row.toSeq.foreach { col =>
println(col)
mainJsonParser(col.toString,spark)
}
}
谢谢@Sandeep Purohit
我必须从 S3 读取某些文件,因此我创建了一个包含这些文件在 S3 上的路径的 CSV。我正在使用以下代码读取创建的 CSV 文件:
val listofFilesRDD = sparkSession.read.textFile("s3://"+ file)
这工作正常。 然后我试图读取每条路径并创建数据框,如:
listofFilesRDD.foreach(iter => {
val pathDF = sparkSession.read
.schema(testSchema)
.option("headers", true)
.csv("s3://"+iter)
pathDF.printSchema()
})
但是,上面的代码给出了 NullPointerException。
那么,我该如何修复上面的代码?
您不能访问 RDD 内部的 RDD!这是唯一的规则!您必须做些其他事情才能使您的逻辑正常工作!
您可以在这里找到更多相关信息:NullPointerException in Scala Spark, appears to be caused be collection type?
您可以解决上述问题,如下所示,您只需创建 s3 文件路径数组并遍历该数组并在其中创建 DF,如下所示
val listofFilesRDD = sparkSession.read.textFile("s3://"+ file)
val listOfPaths = listofFilesRDD.collect()
listOfPaths.foreach(iter => {
val pathDF = sparkSession.read
.schema(testSchema)
.option("headers", true)
.csv("s3://"+iter)
pathDF.printSchema()
})
如果有人遇到DataFrame问题,可以解决这个问题。
def parameterjsonParser(queryDF:DataFrame,spark:SparkSession): Unit ={
queryDF.show()
val otherDF=queryDF.collect()
otherDF.foreach { row =>
row.toSeq.foreach { col =>
println(col)
mainJsonParser(col.toString,spark)
}
}
谢谢@Sandeep Purohit