如何通过 Spark 压缩 open/stream .zip 文件?
How to open/stream .zip files through Spark?
我有要打开的 zip 文件 'through' Spark。由于 Hadoops 本机编解码器支持,我可以毫无问题地打开 .gzip 文件,但无法打开 .zip 文件。
有没有一种简单的方法可以在您的 Spark 代码中读取 zip 文件?我还搜索了 zip 编解码器实现以添加到 CompressionCodecFactory,但到目前为止没有成功。
请尝试以下代码:
using API sparkContext.newAPIHadoopRDD(
hadoopConf,
InputFormat.class,
ImmutableBytesWritable.class, Result.class)
@user3591785 指出了正确的方向,所以我将他的回答标记为正确。
关于更多细节,我能够搜索 ZipFileInputFormat Hadoop,并遇到这个 link:http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/
使用 ZipFileInputFormat 及其助手 ZipfileRecordReader class,我能够让 Spark 完美地打开和读取 zip 文件。
rdd1 = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());
结果是一张只有一个元素的地图。文件名作为键,内容作为值,所以我需要将其转换为 JavaPairRdd。我确定您可以根据需要将 Text 替换为 BytesWritable,并将 ArrayList 替换为其他内容,但我的目标是首先获得一些内容 运行.
JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();
InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String line;
while ((line = br.readLine()) != null) {
Tuple2 newTuple = new Tuple2(line.split("\t")[0],line);
newList.add(newTuple);
}
return newList;
}
});
using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class)
文件名应该使用 conf
conf=( new Job().getConfiguration())
conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)
请从您的输入格式化程序中找到 PROPERTY_NAME
以设置路径
python 代码没有解决方案,我最近不得不在 pyspark 中阅读 zips。而且,在搜索如何做到这一点时,我遇到了这个问题。所以,希望这会对其他人有所帮助。
import zipfile
import io
def zip_extract(x):
in_memory_data = io.BytesIO(x[1])
file_obj = zipfile.ZipFile(in_memory_data, "r")
files = [i for i in file_obj.namelist()]
return dict(zip(files, [file_obj.open(file).read() for file in files]))
zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()
在上面的代码中,我返回了一个字典,其中以 zip 中的文件名作为键,以每个文件中的文本数据作为值。您可以根据自己的目的更改它。
我遇到过类似的问题,我已经用下面的代码解决了
sparkContext.binaryFiles("/pathToZipFiles/*")
.flatMap { case (zipFilePath, zipContent) =>
val zipInputStream = new ZipInputStream(zipContent.open())
Stream.continually(zipInputStream.getNextEntry)
.takeWhile(_ != null)
.flatMap { zipEntry => ??? }
}
本回答仅收集前人知识,分享我的经验
ZipFileInputFormat
我尝试在我的生产集群上使用 and answers, and use imported ZipFileInputFormat
together with sc.newAPIHadoopFile
API. But this did not work for me. And I do not know how would I put com-cotdp-hadoop 库。我不负责设置。
ZipInputStream
给了一个很好的建议,但他没有完成他的回答,我花了很长时间才真正得到解压输出。
当我能够这样做的时候,我必须准备好所有的理论方面,你可以在我的回答中找到:
但是上述答案中缺少的部分是阅读 ZipEntry
:
import java.util.zip.ZipInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
尝试:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.text("yourGzFile.gz")
我有要打开的 zip 文件 'through' Spark。由于 Hadoops 本机编解码器支持,我可以毫无问题地打开 .gzip 文件,但无法打开 .zip 文件。
有没有一种简单的方法可以在您的 Spark 代码中读取 zip 文件?我还搜索了 zip 编解码器实现以添加到 CompressionCodecFactory,但到目前为止没有成功。
请尝试以下代码:
using API sparkContext.newAPIHadoopRDD(
hadoopConf,
InputFormat.class,
ImmutableBytesWritable.class, Result.class)
@user3591785 指出了正确的方向,所以我将他的回答标记为正确。
关于更多细节,我能够搜索 ZipFileInputFormat Hadoop,并遇到这个 link:http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/
使用 ZipFileInputFormat 及其助手 ZipfileRecordReader class,我能够让 Spark 完美地打开和读取 zip 文件。
rdd1 = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());
结果是一张只有一个元素的地图。文件名作为键,内容作为值,所以我需要将其转换为 JavaPairRdd。我确定您可以根据需要将 Text 替换为 BytesWritable,并将 ArrayList 替换为其他内容,但我的目标是首先获得一些内容 运行.
JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();
InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String line;
while ((line = br.readLine()) != null) {
Tuple2 newTuple = new Tuple2(line.split("\t")[0],line);
newList.add(newTuple);
}
return newList;
}
});
using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class)
文件名应该使用 conf
conf=( new Job().getConfiguration())
conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)
请从您的输入格式化程序中找到 PROPERTY_NAME
以设置路径
python 代码没有解决方案,我最近不得不在 pyspark 中阅读 zips。而且,在搜索如何做到这一点时,我遇到了这个问题。所以,希望这会对其他人有所帮助。
import zipfile
import io
def zip_extract(x):
in_memory_data = io.BytesIO(x[1])
file_obj = zipfile.ZipFile(in_memory_data, "r")
files = [i for i in file_obj.namelist()]
return dict(zip(files, [file_obj.open(file).read() for file in files]))
zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()
在上面的代码中,我返回了一个字典,其中以 zip 中的文件名作为键,以每个文件中的文本数据作为值。您可以根据自己的目的更改它。
我遇到过类似的问题,我已经用下面的代码解决了
sparkContext.binaryFiles("/pathToZipFiles/*")
.flatMap { case (zipFilePath, zipContent) =>
val zipInputStream = new ZipInputStream(zipContent.open())
Stream.continually(zipInputStream.getNextEntry)
.takeWhile(_ != null)
.flatMap { zipEntry => ??? }
}
本回答仅收集前人知识,分享我的经验
ZipFileInputFormat
我尝试在我的生产集群上使用 ZipFileInputFormat
together with sc.newAPIHadoopFile
API. But this did not work for me. And I do not know how would I put com-cotdp-hadoop 库。我不负责设置。
ZipInputStream
当我能够这样做的时候,我必须准备好所有的理论方面,你可以在我的回答中找到:
但是上述答案中缺少的部分是阅读 ZipEntry
:
import java.util.zip.ZipInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
sc.binaryFiles(path, minPartitions)
.flatMap { case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
尝试:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.text("yourGzFile.gz")