如何通过 Spark 压缩 open/stream .zip 文件?

How to open/stream .zip files through Spark?

我有要打开的 zip 文件 'through' Spark。由于 Hadoops 本机编解码器支持,我可以毫无问题地打开 .gzip 文件,但无法打开 .zip 文件。

有没有一种简单的方法可以在您的 Spark 代码中读取 zip 文件?我还搜索了 zip 编解码器实现以添加到 CompressionCodecFactory,但到目前为止没有成功。

请尝试以下代码:

using API sparkContext.newAPIHadoopRDD(
    hadoopConf,
    InputFormat.class,
    ImmutableBytesWritable.class, Result.class)

@user3591785 指出了正确的方向,所以我将他的回答标记为正确。

关于更多细节,我能够搜索 ZipFileInputFormat Hadoop,并遇到这个 link:http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/

使用 ZipFileInputFormat 及其助手 ZipfileRecordReader class,我能够让 Spark 完美地打开和读取 zip 文件。

    rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());

结果是一张只有一个元素的地图。文件名作为键,内容作为值,所以我需要将其转换为 JavaPairRdd。我确定您可以根据需要将 Text 替换为 BytesWritable,并将 ArrayList 替换为其他内容,但我的目标是首先获得一些内容 运行.

JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {

    @Override
    public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
        List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();

        InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
        BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));

        String line;

        while ((line = br.readLine()) != null) {

        Tuple2 newTuple = new Tuple2(line.split("\t")[0],line);
            newList.add(newTuple);
        }
        return newList;
    }
});
using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class) 

文件名应该使用 conf

conf=( new Job().getConfiguration())
conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)

请从您的输入格式化程序中找到 PROPERTY_NAME 以设置路径

python 代码没有解决方案,我最近不得不在 pyspark 中阅读 zips。而且,在搜索如何做到这一点时,我遇到了这个问题。所以,希望这会对其他人有所帮助。

import zipfile
import io

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()

在上面的代码中,我返回了一个字典,其中以 zip 中的文件名作为键,以每个文件中的文本数据作为值。您可以根据自己的目的更改它。

我遇到过类似的问题,我已经用下面的代码解决了

sparkContext.binaryFiles("/pathToZipFiles/*")
.flatMap { case (zipFilePath, zipContent) =>

        val zipInputStream = new ZipInputStream(zipContent.open())

        Stream.continually(zipInputStream.getNextEntry)
        .takeWhile(_ != null)
        .flatMap { zipEntry => ??? }
    }

本回答仅收集前人知识,分享我的经验

ZipFileInputFormat

我尝试在我的生产集群上使用 and answers, and use imported ZipFileInputFormat together with sc.newAPIHadoopFile API. But this did not work for me. And I do not know how would I put com-cotdp-hadoop 库。我不负责设置。

ZipInputStream

给了一个很好的建议,但他没有完成他的回答,我花了很长时间才真正得到解压输出。

当我能够这样做的时候,我必须准备好所有的理论方面,你可以在我的回答中找到:

但是上述答案中缺少的部分是阅读 ZipEntry:

import java.util.zip.ZipInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;   

sc.binaryFiles(path, minPartitions)
      .flatMap { case (name: String, content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}
 

尝试:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.text("yourGzFile.gz")