从 spark 中保存压缩的 json

saving compressed json from spark

来自 Spark RDD,我想将 JSON 数据暂存并存档到 AWS S3。只有压缩它才有意义,我有一个使用 hadoop 的进程工作 GzipCodec,但有些事情让我对此感到紧张。

当我在这里查看 org.apache.spark.rdd.RDD.saveAsTextFile 的类型签名时:

https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.rdd.RDD

类型签名是:

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

但是当我在这里检查可用的压缩编解码器时:

https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.io.CompressionCodec

父特征CompressionCodec和子类型都说:

The wire protocol for a codec is not guaranteed compatible across versions of Spark. This is intended for use as an internal compression utility within a single Spark application

这不好...但没关系,因为 gzip 可能更容易跨生态系统处理。

类型签名表明编解码器必须是 CompressionCodec 的子类型...但我尝试了以下方法来保存为 .gz,并且它工作正常,即使 hadoop 的 GzipCodec 不是 <: CompressionCodec.

import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsTextFile(bucketName, classOf[GzipCodec])

我的问题:

嗯,对于初学者来说,你是绑定到 RDD 还是可以使用 DataSets/DataFrames?

对于 DataFrames,您可以使用

 df.write.format("json").
    option("compression", "org.apache.hadoop.io.compress.GzipCodec").
    save("...")

但是,有一些注意事项。压缩效果很好,但如果你的文件 生成非常大,你必须记住 gzip 不是可拆分的格式,也就是说, 如果您想稍后处理该文件,则必须由一名工作人员读取。 例如,如果您的文件是不可拆分的并且是 1G,则需要 T 时间来处理,如果它是可拆分的(如 LZO、Snappy 或 BZip2),则可以在 T/N 中处理,其中 N 是数字拆分(假设 128MB 块,那将是大约 8)。 这就是 Hadoop 使用 SequenceFiles(可拆分,并在一个块内使用 gzip)的原因,这就是为什么存储到 S3 时选择的压缩格式通常是 Parquet。 Parquet 文件比 Gzipped 文件小,并且是可分割的,也就是说,它的内容可以由多个 worker 处理。 您仍然可以使用 gzipped 文本文件,但将它们保持在 ~100/200MB 范围内。

归根结底,这实际上取决于您打算如何处理 S3 中的数据。

要查询吗?在这种情况下,Parquet 是更好的格式选择。

其他不理解 parquet 的系统会 read/copied 吗?然后gzip压缩就ok了。而且它很稳定,您不必担心它会发生变化。 你可以自己试试,在S3上保存一些示例数据,你仍然可以用任何gzip工具打开它。