Spark 谓词下推未按预期工作
Spark Predicate Pushdown Not Working As Expected
我对 Spark 的谓词下推行为有疑问。似乎有些不对劲。
我在 MacOS
上使用 Spark 2.4.5 版
下面是我的示例 csv 数据 results2.csv
val df = spark.read.option("header", "true").csv("/Users/apple/kaggle-data/results2.csv")
在 2 列上进行分区:国家和城市
df.repartition($"country",$"city").write.option("header", "true").partitionBy("country","city").parquet("/Users/apple/kaggle-data/part2/")
1 列分区:国家
val df2 = spark.read.option("header", "true").csv("/Users/apple/kaggle-data/results2.csv")
df2.repartition($"country").write.option("header", "true").partitionBy("country").parquet("/Users/apple/kaggle-data/part1/")
我读取的数据仅包含国家/地区分区并查询谓词国家/地区和城市,但下推过滤器显示的城市不是预期的,我希望国家/地区在这里
val kaggleDf1 = spark.read.option("header", "true").parquet("/Users/apple/kaggle-data/part1/")
kaggleDf1.where($"country" === "England" && $"city" === "London").explain(true)
计划
== Parsed Logical Plan ==
'Filter (('country = England) && ('city = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, city: string, neutral: string, country: string
Filter ((country#146 = England) && (city#144 = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Optimized Logical Plan ==
Filter (((isnotnull(country#146) && isnotnull(city#144)) && (country#146 = England)) && (city#144 = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Physical Plan ==
*(1) Project [date#138, home_team#139, away_team#140, home_score#141, away_score#142, tournament#143, city#144, neutral#145, country#146]
+- *(1) Filter (isnotnull(city#144) && (city#144 = London))
+- *(1) FileScan parquet [date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part1], PartitionCount: 1, PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: [IsNotNull(city), EqualTo(city,London)]***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...
我读取的数据只有在 country 上有分区,并在 predicate country 上查询,但下推过滤器显示为空,这不是预期的,我期待 country 在这里
kaggleDf1.where($"country" === "England").explain(true)
计划:
== Parsed Logical Plan ==
'Filter ('country = England)
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, city: string, neutral: string, country: string
Filter (country#146 = England)
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Optimized Logical Plan ==
Filter (isnotnull(country#146) && (country#146 = England))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Physical Plan ==
*(1) FileScan parquet [date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part1], PartitionCount: 1, PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: []***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...
我读取了关于 country 和 city 的分区数据并查询谓词 country 和 city ,但是下推过滤器显示为空,这不是预期的,我期望 country 和 city 在这里
val kaggleDf2 = spark.read.option("header", "true").parquet("/Users/apple/kaggle-data/part2/")
kaggleDf2.where($"country" === "England" && $"city" === "London").explain(true)
计划:
== Parsed Logical Plan ==
'Filter (('country = England) && ('city = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet
== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, neutral: string, country: string, city: string
Filter ((country#165 = England) && (city#166 = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet
== Optimized Logical Plan ==
Filter (((isnotnull(country#165) && isnotnull(city#166)) && (country#165 = England)) && (city#166 = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet
== Physical Plan ==
*(1) FileScan parquet [date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part2], PartitionCount: 1, PartitionFilters: [isnotnull(country#165), isnotnull(city#166), (country#165 = England), (city#166 = London)], ***PushedFilters: []***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...
谁能帮我看看这里出了什么问题。我错过了什么吗?
我认为你误解了。
I read data with partition only on country and query on predicate
country and city , but the pushdown filter shows city which is not
expected, i was expecting country to be here.
有一个分区过滤器用于分区修剪并且下推意味着过滤器被推送到源 而不是被引入 Spark - 虽然你可以禁用它。这是出于性能原因。
下推有两个方面。分区过滤器只允许读取那些分区,这样可以节省扫描时间,然后在该分区或多个分区内,随后应用城市过滤器。 PARQUET也是柱状的。
...PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: [IsNotNull(city), EqualTo(city,London)]***...
所以,没问题,期望需要保持一致,仅此而已。第二种情况你现在应该可以解决了。
这是因为 PartitionFilters
并且行为是预期的。
当使用 partition by
保存 parquet 文件中的数据时,如果查询匹配某个分区 filter criteria
,Spark 只读取那些匹配分区过滤器的子目录,因此它不会需要再次对数据应用该过滤器,因此根本不会对这些列进行任何过滤器。
现在你的情况是:
kaggleDf1.where($"country" === "England" && $"city" === "London")
PartitionFilters: [isnotnull(country#146), (country#146 = England)]
PushedFilters: [IsNotNull(city), EqualTo(city,London)]
Spark 只读取那些包含 country === "England"
的文件(因为您的数据在保存期间被 country
分区),因此它不需要再次对数据应用该过滤器。除了 PartitionFilters
.
,您不会在任何地方找到此过滤器
我对 Spark 的谓词下推行为有疑问。似乎有些不对劲。 我在 MacOS
上使用 Spark 2.4.5 版下面是我的示例 csv 数据 results2.csv
val df = spark.read.option("header", "true").csv("/Users/apple/kaggle-data/results2.csv")
在 2 列上进行分区:国家和城市
df.repartition($"country",$"city").write.option("header", "true").partitionBy("country","city").parquet("/Users/apple/kaggle-data/part2/")
1 列分区:国家
val df2 = spark.read.option("header", "true").csv("/Users/apple/kaggle-data/results2.csv")
df2.repartition($"country").write.option("header", "true").partitionBy("country").parquet("/Users/apple/kaggle-data/part1/")
我读取的数据仅包含国家/地区分区并查询谓词国家/地区和城市,但下推过滤器显示的城市不是预期的,我希望国家/地区在这里
val kaggleDf1 = spark.read.option("header", "true").parquet("/Users/apple/kaggle-data/part1/")
kaggleDf1.where($"country" === "England" && $"city" === "London").explain(true)
计划
== Parsed Logical Plan ==
'Filter (('country = England) && ('city = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, city: string, neutral: string, country: string
Filter ((country#146 = England) && (city#144 = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Optimized Logical Plan ==
Filter (((isnotnull(country#146) && isnotnull(city#144)) && (country#146 = England)) && (city#144 = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Physical Plan ==
*(1) Project [date#138, home_team#139, away_team#140, home_score#141, away_score#142, tournament#143, city#144, neutral#145, country#146]
+- *(1) Filter (isnotnull(city#144) && (city#144 = London))
+- *(1) FileScan parquet [date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part1], PartitionCount: 1, PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: [IsNotNull(city), EqualTo(city,London)]***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...
我读取的数据只有在 country 上有分区,并在 predicate country 上查询,但下推过滤器显示为空,这不是预期的,我期待 country 在这里
kaggleDf1.where($"country" === "England").explain(true)
计划:
== Parsed Logical Plan ==
'Filter ('country = England)
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, city: string, neutral: string, country: string
Filter (country#146 = England)
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Optimized Logical Plan ==
Filter (isnotnull(country#146) && (country#146 = England))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet
== Physical Plan ==
*(1) FileScan parquet [date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part1], PartitionCount: 1, PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: []***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...
我读取了关于 country 和 city 的分区数据并查询谓词 country 和 city ,但是下推过滤器显示为空,这不是预期的,我期望 country 和 city 在这里
val kaggleDf2 = spark.read.option("header", "true").parquet("/Users/apple/kaggle-data/part2/")
kaggleDf2.where($"country" === "England" && $"city" === "London").explain(true)
计划:
== Parsed Logical Plan ==
'Filter (('country = England) && ('city = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet
== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, neutral: string, country: string, city: string
Filter ((country#165 = England) && (city#166 = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet
== Optimized Logical Plan ==
Filter (((isnotnull(country#165) && isnotnull(city#166)) && (country#165 = England)) && (city#166 = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet
== Physical Plan ==
*(1) FileScan parquet [date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part2], PartitionCount: 1, PartitionFilters: [isnotnull(country#165), isnotnull(city#166), (country#165 = England), (city#166 = London)], ***PushedFilters: []***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...
谁能帮我看看这里出了什么问题。我错过了什么吗?
我认为你误解了。
I read data with partition only on country and query on predicate country and city , but the pushdown filter shows city which is not expected, i was expecting country to be here.
有一个分区过滤器用于分区修剪并且下推意味着过滤器被推送到源 而不是被引入 Spark - 虽然你可以禁用它。这是出于性能原因。
下推有两个方面。分区过滤器只允许读取那些分区,这样可以节省扫描时间,然后在该分区或多个分区内,随后应用城市过滤器。 PARQUET也是柱状的。
...PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: [IsNotNull(city), EqualTo(city,London)]***...
所以,没问题,期望需要保持一致,仅此而已。第二种情况你现在应该可以解决了。
这是因为 PartitionFilters
并且行为是预期的。
当使用 partition by
保存 parquet 文件中的数据时,如果查询匹配某个分区 filter criteria
,Spark 只读取那些匹配分区过滤器的子目录,因此它不会需要再次对数据应用该过滤器,因此根本不会对这些列进行任何过滤器。
现在你的情况是:
kaggleDf1.where($"country" === "England" && $"city" === "London")
PartitionFilters: [isnotnull(country#146), (country#146 = England)]
PushedFilters: [IsNotNull(city), EqualTo(city,London)]
Spark 只读取那些包含 country === "England"
的文件(因为您的数据在保存期间被 country
分区),因此它不需要再次对数据应用该过滤器。除了 PartitionFilters
.