写一个带有增量编码列的镶木地板文件
Write a parquet file with delta encoded coulmns
我正在尝试使用 delta 编码编写镶木地板文件。
This page,指出 parquet 支持三种类型的增量编码:
(DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY).
由于spark
、pyspark
或pyarrow
不允许我们指定编码方法,我很好奇如何编写启用了delta编码的文件?
但是,我在互联网上发现,如果我有 TimeStamp
类型的列,parquet 将使用增量编码。
所以我在 scala
中使用了以下代码来创建一个 parquet 文件。但编码不是增量。
val df = Seq(("2018-05-01"),
("2018-05-02"),
("2018-05-03"),
("2018-05-04"),
("2018-05-05"),
("2018-05-06"),
("2018-05-07"),
("2018-05-08"),
("2018-05-09"),
("2018-05-10")
).toDF("Id")
val df2 = df.withColumn("Timestamp", (col("Id").cast("timestamp")))
val df3 = df2.withColumn("Date", (col("Id").cast("date")))
df3.coalesce(1).write.format("parquet").mode("append").save("date_time2")
parquet-tools
显示有关写入的 parquet 文件的以下信息。
file schema: spark_schema
--------------------------------------------------------------------------------
Id: OPTIONAL BINARY L:STRING R:0 D:1
Timestamp: OPTIONAL INT96 R:0 D:1
Date: OPTIONAL INT32 L:DATE R:0 D:1
row group 1: RC:31 TS:1100 OFFSET:4
--------------------------------------------------------------------------------
Id: BINARY SNAPPY DO:0 FPO:4 SZ:230/487/2.12 VC:31 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 2018-05-01, max: 2018-05-31, num_nulls: 0]
Timestamp: INT96 SNAPPY DO:0 FPO:234 SZ:212/436/2.06 VC:31 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
Date: INT32 SNAPPY DO:0 FPO:446 SZ:181/177/0.98 VC:31 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 2018-05-01, max: 2018-05-31, num_nulls: 0]
如您所见,没有列使用增量编码。
我的问题是:
如何使用增量编码编写 parquet 文件? (如果你可以的话
在 scala
或 python
中提供示例代码会很棒。)
如何决定使用哪个 "delta encoding": (DELTA_BINARY_PACKED,
DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY)
?
在使用 PySpark 生成 parquet 文件时弄清楚如何启用 DELTA 编码真的很有挑战性。
我们生成了大量数字数据,使用 DELTA 编码可以真正受益。在我的测试中,我能够使用 DELTA 编码将一个 136.9MB 的小测试文件减少到 101.6MB。对于我们的用例,我们生成了数 TB 的数据,因此未来 S3 的节省值得考虑。
我的经验是使用 EMR 5.29.0 的 Spark 2.4.5。在生成 DELTA 编码文件之前和之后,我遇到了很多问题。我会提及它们,以便您了解这些问题并且不会绊倒自己。
为了在 PySpark 中生成 DELTA 编码的 parquet 文件,我们需要启用 Parquet 写入的版本 2。这是它工作的唯一方式。此外,由于某种原因,该设置仅在创建 spark 上下文时有效。设置为:
"spark.hadoop.parquet.writer.version": "v2"
结果是:
time: INT64 GZIP DO:0 FPO:11688 SZ:84010/2858560/34.03 VC:15043098 ENC:DELTA_BINARY_PACKED ST:[min: 1577715561210, max: 1577839907009, num_nulls: 0]
但是,您无法像往常一样在 PySpark 中读取相同的文件
java.lang.UnsupportedOperationException: Unsupported encoding: DELTA_BINARY_PACKED
为了读回文件,需要禁用以下配置:
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
另外,一个悲伤但额外的提示,Pandas 在撰写本文时不会读取这些文件 post。
Parquet files v2.0 created by spark can't be read by pyarrow
我正在尝试使用 delta 编码编写镶木地板文件。 This page,指出 parquet 支持三种类型的增量编码:
(DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY).
由于spark
、pyspark
或pyarrow
不允许我们指定编码方法,我很好奇如何编写启用了delta编码的文件?
但是,我在互联网上发现,如果我有 TimeStamp
类型的列,parquet 将使用增量编码。
所以我在 scala
中使用了以下代码来创建一个 parquet 文件。但编码不是增量。
val df = Seq(("2018-05-01"),
("2018-05-02"),
("2018-05-03"),
("2018-05-04"),
("2018-05-05"),
("2018-05-06"),
("2018-05-07"),
("2018-05-08"),
("2018-05-09"),
("2018-05-10")
).toDF("Id")
val df2 = df.withColumn("Timestamp", (col("Id").cast("timestamp")))
val df3 = df2.withColumn("Date", (col("Id").cast("date")))
df3.coalesce(1).write.format("parquet").mode("append").save("date_time2")
parquet-tools
显示有关写入的 parquet 文件的以下信息。
file schema: spark_schema
--------------------------------------------------------------------------------
Id: OPTIONAL BINARY L:STRING R:0 D:1
Timestamp: OPTIONAL INT96 R:0 D:1
Date: OPTIONAL INT32 L:DATE R:0 D:1
row group 1: RC:31 TS:1100 OFFSET:4
--------------------------------------------------------------------------------
Id: BINARY SNAPPY DO:0 FPO:4 SZ:230/487/2.12 VC:31 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 2018-05-01, max: 2018-05-31, num_nulls: 0]
Timestamp: INT96 SNAPPY DO:0 FPO:234 SZ:212/436/2.06 VC:31 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
Date: INT32 SNAPPY DO:0 FPO:446 SZ:181/177/0.98 VC:31 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 2018-05-01, max: 2018-05-31, num_nulls: 0]
如您所见,没有列使用增量编码。
我的问题是:
如何使用增量编码编写 parquet 文件? (如果你可以的话 在
scala
或python
中提供示例代码会很棒。)如何决定使用哪个 "delta encoding":
(DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY)
?
在使用 PySpark 生成 parquet 文件时弄清楚如何启用 DELTA 编码真的很有挑战性。
我们生成了大量数字数据,使用 DELTA 编码可以真正受益。在我的测试中,我能够使用 DELTA 编码将一个 136.9MB 的小测试文件减少到 101.6MB。对于我们的用例,我们生成了数 TB 的数据,因此未来 S3 的节省值得考虑。
我的经验是使用 EMR 5.29.0 的 Spark 2.4.5。在生成 DELTA 编码文件之前和之后,我遇到了很多问题。我会提及它们,以便您了解这些问题并且不会绊倒自己。
为了在 PySpark 中生成 DELTA 编码的 parquet 文件,我们需要启用 Parquet 写入的版本 2。这是它工作的唯一方式。此外,由于某种原因,该设置仅在创建 spark 上下文时有效。设置为:
"spark.hadoop.parquet.writer.version": "v2"
结果是:
time: INT64 GZIP DO:0 FPO:11688 SZ:84010/2858560/34.03 VC:15043098 ENC:DELTA_BINARY_PACKED ST:[min: 1577715561210, max: 1577839907009, num_nulls: 0]
但是,您无法像往常一样在 PySpark 中读取相同的文件
java.lang.UnsupportedOperationException: Unsupported encoding: DELTA_BINARY_PACKED
为了读回文件,需要禁用以下配置:
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
另外,一个悲伤但额外的提示,Pandas 在撰写本文时不会读取这些文件 post。
Parquet files v2.0 created by spark can't be read by pyarrow