高效读取 Spark 中的嵌套镶木地板列
Efficient reading nested parquet column in Spark
我有以下(简化的)架构:
root
|-- event: struct (nullable = true)
| |-- spent: struct (nullable = true)
| | |-- amount: decimal(34,3) (nullable = true)
| | |-- currency: string (nullable = true)
| |
| | ... ~ 20 other struct fields on "event" level
我正在尝试对嵌套字段求和
spark.sql("select sum(event.spent.amount) from event")
根据 spark 指标,我正在从磁盘读取 18 GB,这需要 2.5 分钟。
但是当我 select 顶级字段时:
spark.sql("select sum(amount) from event")
我在 4 秒内只读取了 2GB。
从物理计划中我可以看到,在嵌套结构的情况下,整个事件结构所有字段都是从镶木地板中读取的,这是一种浪费。
Parquet 格式应该能够从嵌套结构中提供所需的列,而无需全部读取(这是列存储的要点)。有什么方法可以在 Spark 中有效地做到这一点吗?
解决方案:
spark.sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=true")
spark.sql("select sum(amount) from (select event.spent.amount as amount from event_archive)")
查询必须以 sub-select 方式编写。您不能将 selected 列包装在聚合函数中。以下查询将破坏架构修剪:
select sum(event.spent.amount) as amount from event
涵盖了整个模式修剪工作
肮脏的解决方法也可以在加载时指定 "projected schema":
val DecimalType = DataTypes.createDecimalType(18, 4)
val schema = StructType(StructField("event", StructType(
StructField("spent", StructType(
StructField("amount", DecimalType, true) :: Nil
), true) :: Nil
), true) :: Nil
)
val df = spark.read.format("parquet").schema(schema).load(<path>)
我有以下(简化的)架构:
root
|-- event: struct (nullable = true)
| |-- spent: struct (nullable = true)
| | |-- amount: decimal(34,3) (nullable = true)
| | |-- currency: string (nullable = true)
| |
| | ... ~ 20 other struct fields on "event" level
我正在尝试对嵌套字段求和
spark.sql("select sum(event.spent.amount) from event")
根据 spark 指标,我正在从磁盘读取 18 GB,这需要 2.5 分钟。
但是当我 select 顶级字段时:
spark.sql("select sum(amount) from event")
我在 4 秒内只读取了 2GB。
从物理计划中我可以看到,在嵌套结构的情况下,整个事件结构所有字段都是从镶木地板中读取的,这是一种浪费。
Parquet 格式应该能够从嵌套结构中提供所需的列,而无需全部读取(这是列存储的要点)。有什么方法可以在 Spark 中有效地做到这一点吗?
解决方案:
spark.sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=true")
spark.sql("select sum(amount) from (select event.spent.amount as amount from event_archive)")
查询必须以 sub-select 方式编写。您不能将 selected 列包装在聚合函数中。以下查询将破坏架构修剪:
select sum(event.spent.amount) as amount from event
涵盖了整个模式修剪工作
肮脏的解决方法也可以在加载时指定 "projected schema":
val DecimalType = DataTypes.createDecimalType(18, 4)
val schema = StructType(StructField("event", StructType(
StructField("spent", StructType(
StructField("amount", DecimalType, true) :: Nil
), true) :: Nil
), true) :: Nil
)
val df = spark.read.format("parquet").schema(schema).load(<path>)