Dataproc 无法解压缩 AWS Kinesis 压缩的 .gz 文件

Dataproc cannot unzip .gz file zipped by AWS Kinesis

我的公司正在尝试将服务从 AWS 迁移到 GCP。我们面临一些问题。 AWS Kinesis 收集的数据是 .gz 个文件。我们使用 GCP 的 Cloud Storage 将这些文件传输到 GCP 平台,并使用 Dataproc 来处理这些数据。所有这些数据都可以在AWS中正确处理,但不能被同一个Spark作业正确读取。

见最后抛出的异常。

我试图在 GCP Cloud Shell 中解压缩这些文件之一,例如 ABC.gz。解压缩后的文件仍然以 .gz: ABC.gz 结尾。我认为这是根本原因,因为 Spark 可能会尝试解压缩已解压缩的文件。

如果我们通过删除 .gz 后缀来重命名这些文件,那么 Spark 可以 运行 正常。但是重命名过程太耗时,处理一天的数据需要几个小时以上。

非常感谢任何建议。提前致谢。

Caused by: java.io.IOException: incorrect header check
  at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
  at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
  at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
  at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
  at java.io.InputStream.read(InputStream.java:101)
  at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
  at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
  at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
  at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:151)
  at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:191)
  at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
  at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:109)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.nextIterator(FileScanRDD.scala:190)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon.hasNext(FileScanRDD.scala:109)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:6
31)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:253)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:247)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:836)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:836)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

没有进一步的细节,很难说到底发生了什么,但很可能你存储 .gz 个未压缩的文件或使用 GCS decompressive transcoding。这意味着 Spark 读取的文件已经解压(它们首先没有被压缩,或者如果使用 GCS 解压转码,则由 HTTP 客户端库解压)这会导致失败,因为 Hadoop/Spark 将自动尝试使用 .gz 扩展也是。

如果以上为真,那么除了重命名这些文件以删除 .gz 扩展名之外,您似乎别无选择。另请注意,在 Spark/Hadoop 中处理 Gzip 压缩文件效率低下,因为它们不可拆分。

这是由于 header 由 Kinesis 用 object 编写的。 Kinesis 将 Content-Encoding=gzip 元数据添加到 object,当传输请求 object 时,它被添加到响应 header,这会导致 [=19] 自动解压=] 传输时。

您可以删除此 header 或下载后删除扩展。

我相信,如果您使用 PowerShell 和 AWS SDK 下载文件,那么它不会自动解压缩文件。