从 Spark 中的压缩中读取整个文本文件

Read whole text files from a compression in Spark

我有以下问题:假设我有一个包含压缩目录的目录,其中包含多个文件,存储在 HDFS 上。我想创建一个包含一些 T 类型对象的 RDD,即:

context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String content = fileNameContent._2();

    // Class T has a constructor of taking the filename and the content of each
    // processed file (as two strings)
    T t = new T(content, fileName);

    return t;
});

现在,当 inputDataPath 是一个包含文件的目录时,它工作得很好,即当它是这样的:

String inputDataPath =  "hdfs://some_path/*/*/"; // because it contains subfolders

但是,当有一个包含多个文件的 tgz 时,文件内容 (fileNameContent._2()) 会给我一些无用的二进制字符串(意料之中)。我找到了一个关于 wholeTextFilessimilar question on SO, but it's not the same case, because there the solution is when each compression consists of one file only, and in my case there are many other files which I want to read individually as whole files. I also found a question,但这对我来说不起作用。

有什么办法吗?

编辑:

我尝试使用 here (trying to test the reader from here 中的 reader,就像在函数 testTarballWithFolders() 中一样),但是每当我调用

TarballReader tarballReader = new TarballReader(fileName);

我得到 NullPointerException

java.lang.NullPointerException
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
    at utils.TarballReader.<init>(TarballReader.java:61)
    at main.SparkMain.lambda[=13=](SparkMain.java:105)
    at main.SparkMain$$Lambda/1667100242.call(Unknown Source)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:1015)
    at scala.collection.Iterator$$anon.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

MainSpark 中的第 105 行是我在 post 的编辑中显示的行,TarballReader 中的第 61 行是

GZIPInputStream gzip = new GZIPInputStream(in);

上面一行的输入流in给出了空值:

InputStream in = this.getClass().getResourceAsStream(tarball);

我走的路对吗?如果是这样,我该如何继续?为什么我得到这个空值,我该如何解决它?

一种可能的解决方案是使用 binaryFiles 读取数据并手动提取内容。

Scala:

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
  val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
  Stream.continually(Option(tar.getNextTarEntry))
    // Read until next exntry is null
    .takeWhile(_.isDefined)
    // flatten
    .flatMap(x => x)
    // Drop directories
    .filter(!_.isDirectory)
    .map(e => {
      Stream.continually {
        // Read n bytes
        val buffer = Array.fill[Byte](n)(-1)
        val i = tar.read(buffer, 0, n)
        (i, buffer.take(i))}
      // Take as long as we've read something
      .takeWhile(_._1 > 0)
      .map(_._2)
      .flatten
      .toArray})
    .toArray
}

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = 
  new String(bytes, StandardCharsets.UTF_8)

sc.binaryFiles("somePath").flatMapValues(x => 
  extractFiles(x).toOption).mapValues(_.map(decode()))
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"

Java 的完整用法示例:https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Python:

import tarfile
from io import BytesIO

def extractFiles(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if x.isfile()]

(sc.binaryFiles("somePath")
    .mapValues(extractFiles)
    .mapValues(lambda xs: [x.decode("utf-8") for x in xs]))

对接受的答案稍作改进是更改

Option(tar.getNextTarEntry)

Try(tar.getNextTarEntry).toOption.filter( _ != null)

以稳健的方式应对格式错误/截断的 .tar.gzs。

顺便说一句,缓冲区数组的大小有什么特别之处吗?如果它接近平均文件大小(在我的情况下可能是 500k),它平均会更快吗?或者是我看到的减速更可能是 Stream 相对于 while 循环的开销,我猜 Java-ish。