使用 spark SQL 将数据框内容保存为 csv 文件时,spark 作业因异常而失败

spark job failed with exception while saving dataframe contentes as csv files using spark SQL

我正在尝试将数据帧内容以 csv 格式保存到 hdfs。我可以用 no.of 小文件来完成。在尝试处理更多文件(90 多个文件)时出现 NullPointerException 并且作业失败。下面是我的代码:

val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "false").option("delimiter", "|").load(hdfs path for loading multiple files/*");

val mydateFunc = udf {(x: String) => x.split("/") match {case Array(month,date,year) => year+"-"+month+"-"+date case Array(y)=> y}}

val df2 = df1.withColumn("orderdate", mydateFunc(df1("Date on which the record was created"))).drop("Date on which the record was created")

val df3 = df2.withColumn("deliverydate", mydateFunc(df2("Requested delivery date"))).drop("Requested delivery date")

val exp = "(.*)(44000\d{5}|69499\d{6})(.*)".r


val upc_extractor: (String => String) = (arg: String) => arg match { case exp(pref,required,suffx) => required case x:String => x }

val sqlfunc = udf(upc_extractor)

val df4 = df3.withColumn("formatted_UPC", sqlfunc(col("European Article Numbers/Universal Produ")))

df4.write.format("com.databricks.spark.csv").option("header", "false").save("destination path in hdfs to save the resultant files");

以下是我得到的异常:

16/02/03 01:59:15 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/02/03 01:59:33 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
java.lang.NullPointerException
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:30)
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:30)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun.apply(ScalaUdf.scala:71)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun.apply(ScalaUdf.scala:70)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$$anon.next(package.scala:165)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$$anon.next(package.scala:158)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply$mcV$sp(PairRDDFunctions.scala:1109)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
16/02/03 01:59:33 INFO TaskSetManager: Starting task 32.0 in stage 1.0 (TID 33, localhost, ANY, 1692 bytes)
16/02/03 01:59:33 INFO Executor: Running task 32.0 in stage 1.0 (TID 33)
16/02/03 01:59:33 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 3, localhost): java.lang.NullPointerException
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:30)
        at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:30)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun.apply(ScalaUdf.scala:71)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun.apply(ScalaUdf.scala:70)
        at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$$anon.next(package.scala:165)
        at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$$anon.next(package.scala:158)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply$mcV$sp(PairRDDFunctions.scala:1109)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1108)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1116)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1095)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

16/02/03 01:59:33 ERROR TaskSetManager: Task 2 in stage 1.0 failed 1 times; aborting job
16/02/03 01:59:33 INFO TaskSchedulerImpl: Cancelling stage 1
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 29.0 in stage 1.0 (TID 30)
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 8.0 in stage 1.0 (TID 9)
16/02/03 01:59:33 INFO TaskSchedulerImpl: Stage 1 was cancelled
16/02/03 01:59:33 INFO Executor: Executor is trying to kill task 0.0 in stage 1.0 (TID 1)

Spark 版本为 1.4.1。非常感谢任何帮助。

可能是您的某个文件输入错误。要做的第一件事是找到文件。找到它后,您可以尝试找到导致问题的行。当你得到这条线时,仔细观察它,你可能会发现问题所在。我的猜测是列数与预期不符。也许有些东西没有正确转义。如果没有找到,您仍然可以通过添加文件内容来更新问题。

将 if 条件添加到 udf mydateFunc 以过滤导致 NPE 的空值后,代码运行正常。而且我能够加载所有文件。

val mydateFunc = udf {(x: String) => if(x ==null) x else x.split("/") match {case Array(month,date,year) => year+"-"+month+"-"+date case Array(y)=> y}}