Apache Spark 无法读取正在使用流作业编写的镶木地板文件夹
Apache Spark can't read parquet folder that is being written with streaming job
当我尝试使用选项 "mergeSchema":"true" 读取当前正在使用另一个 spark 流作业写入的 parquet 文件夹时,我收到错误:
java.io.IOException: Could not read footer for file
val df = spark
.read
.option("mergeSchema", "true")
.parquet("path.parquet")
如果没有架构合并,我可以很好地读取文件夹,但是是否可以读取带有架构合并的这样的文件夹,而不管可能的副业更新它?
完全异常:
java.io.IOException: Could not read footer for file: FileStatus{path=hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel.apply(ParquetFileFormat.scala:551)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel.apply(ParquetFileFormat.scala:538)
at org.apache.spark.util.ThreadUtils$$anonfun$$anonfun$apply.apply(ThreadUtils.scala:287)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet is not a Parquet file (too small length: 0)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel.apply(ParquetFileFormat.scala:544)
... 9 more
运行 在创建数据框之前遵循以下内容:
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
即启用此配置 - spark.sql.files.ignoreCorruptFiles
如所述here, If this config is true, the Spark jobs will continue to run when encountering corrupted or non-existing files and contents that have been read will still be returned. Also, this config is used by the merge schema flow。
它可从 Spark 2.1.1+
当我尝试使用选项 "mergeSchema":"true" 读取当前正在使用另一个 spark 流作业写入的 parquet 文件夹时,我收到错误:
java.io.IOException: Could not read footer for file
val df = spark
.read
.option("mergeSchema", "true")
.parquet("path.parquet")
如果没有架构合并,我可以很好地读取文件夹,但是是否可以读取带有架构合并的这样的文件夹,而不管可能的副业更新它?
完全异常:
java.io.IOException: Could not read footer for file: FileStatus{path=hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel.apply(ParquetFileFormat.scala:551)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel.apply(ParquetFileFormat.scala:538)
at org.apache.spark.util.ThreadUtils$$anonfun$$anonfun$apply.apply(ThreadUtils.scala:287)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet is not a Parquet file (too small length: 0)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel.apply(ParquetFileFormat.scala:544)
... 9 more
运行 在创建数据框之前遵循以下内容:
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
即启用此配置 - spark.sql.files.ignoreCorruptFiles
如所述here, If this config is true, the Spark jobs will continue to run when encountering corrupted or non-existing files and contents that have been read will still be returned. Also, this config is used by the merge schema flow。
它可从 Spark 2.1.1+