scala spark sql 中过滤器和 where 之间的区别
Difference between filter and where in scala spark sql
我都试过了,但效果一样
示例
val items = List(1, 2, 3)
正在使用过滤器
employees.filter($"emp_id".isin(items:_*)).show
使用
employees.where($"emp_id".isin(items:_*)).show
两者的结果相同
+------+------+------+-------+------+-------+
|EMP_ID|F_NAME|SALARY|DEPT_ID|L_NAME|MANAGER|
+------+------+------+-------+------+-------+
| 6| E6| 2000| 4| L6| 2|
| 7| E7| 3000| 4| L7| 1|
| 8| E8| 4000| 2| L8| 2|
| 9| E9| 1500| 2| L9| 1|
| 10| E10| 1000| 2| L10| 1|
| 4| E4| 400| 3| L4| 1|
| 2| E2| 200| 1| L2| 1|
| 3| E3| 700| 2| L3| 2|
| 5| E5| 300| 2| L5| 2|
+------+------+------+-------+------+-------+
Filters rows using the given condition. This is an alias for filter.
filter
只是此类函数的标准 Scala(和一般的 FP)名称,where
适合喜欢 SQL.
的人
也和Spark优化有关。看简短的例子:
具有结构和数据的 HDFS 中的大型镶木地板文件:
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 27 items
drwxr-xr-x - root root 0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x - root root 0 2019-01-16 13:58 /user/tickers/ticks.parquet/ticker_id=10
drwxr-xr-x - root root 0 2019-01-16 14:04 /user/tickers/ticks.parquet/ticker_id=11
drwxr-xr-x - root root 0 2019-01-16 14:10 /user/tickers/ticks.parquet/ticker_id=12
...
其中每个分区内部都有分区(按日期)
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet/ticker_id=1
Found 6 items
drwxr-xr-x - root root 0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-09
drwxr-xr-x - root root 0 2019-01-16 12:50 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-10
drwxr-xr-x - root root 0 2019-01-16 12:53 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-11
...
结构:
scala> spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").printSchema
root
|-- ticker_id: integer (nullable = true)
|-- ddate: date (nullable = true)
|-- db_tsunx: long (nullable = true)
|-- ask: double (nullable = true)
|-- bid: double (nullable = true)
例如,您有这样的 DS:
val maxTsunx = spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").select(col("ticker_id"),col("db_tsunx")).groupBy("ticker_id").agg(max("db_tsunx"))
每个 ticker_id
包含 max(db_tsunx)
F.E.: 您只想从此 DS
获取一个代码的数据
你有两种方法:
1) maxTsunx.filter(r => r.get(0) == 1)
2) maxTsunx.where(col("ticker_id")===1)
这是一个非常不同的 "Physical Plan"
看看
1)
== Physical Plan ==
*(2) Filter <function1>.apply
+- *(2) HashAggregate(keys=[ticker_id#37], functions=[max(db_tsunx#39L)], output=[ticker_id#37, max(db_tsunx)#52L])
+- Exchange hashpartitioning(ticker_id#37, 200)
+- *(1) HashAggregate(keys=[ticker_id#37], functions=[partial_max(db_tsunx#39L)], output=[ticker_id#37, max#61L])
+- *(1) Project [ticker_id#37, db_tsunx#39L]
+- *(1) FileScan parquet [db_tsunx#39L,ticker_id#37,ddate#38] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet],
PartitionCount: 162,
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<db_tsunx:bigint>
2)
== Physical Plan ==
*(2) HashAggregate(keys=[ticker_id#84], functions=[max(db_tsunx#86L)], output=[ticker_id#84, max(db_tsunx)#99L])
+- Exchange hashpartitioning(ticker_id#84, 200)
+- *(1) HashAggregate(keys=[ticker_id#84], functions=[partial_max(db_tsunx#86L)], output=[ticker_id#84, max#109L])
+- *(1) Project [ticker_id#84, db_tsunx#86L]
+- *(1) FileScan parquet [db_tsunx#86L,ticker_id#84,ddate#85] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet],
PartitionCount: 6,
PartitionFilters: [isnotnull(ticker_id#84), (ticker_id#84 = 1)],
PushedFilters: [],
ReadSchema: struct<db_tsunx:bigint>
比较 162 和 6 和
分区过滤器:[],
分区过滤器:[isnotnull(ticker_id#84), (ticker_id#84 = 1)],
这意味着对来自 DS 的数据进行过滤操作并进入 Spark 并用于优化。
就FYI
,
maxTsunx.filter(r => r.get(0) == 1)
maxTsunx.where(col("ticker_id")===1)
或
maxTsunx.filter(col("ticker_id")===1)
第一种情况,传递函数给过滤函数
在第二种情况下,将条件表达式(字符串或列类型)传递给过滤器或 where 函数。
物理计划 2 也可以通过将 where
替换为过滤器功能来实现。
我都试过了,但效果一样
示例
val items = List(1, 2, 3)
正在使用过滤器
employees.filter($"emp_id".isin(items:_*)).show
使用
employees.where($"emp_id".isin(items:_*)).show
两者的结果相同
+------+------+------+-------+------+-------+
|EMP_ID|F_NAME|SALARY|DEPT_ID|L_NAME|MANAGER|
+------+------+------+-------+------+-------+
| 6| E6| 2000| 4| L6| 2|
| 7| E7| 3000| 4| L7| 1|
| 8| E8| 4000| 2| L8| 2|
| 9| E9| 1500| 2| L9| 1|
| 10| E10| 1000| 2| L10| 1|
| 4| E4| 400| 3| L4| 1|
| 2| E2| 200| 1| L2| 1|
| 3| E3| 700| 2| L3| 2|
| 5| E5| 300| 2| L5| 2|
+------+------+------+-------+------+-------+
Filters rows using the given condition. This is an alias for filter.
filter
只是此类函数的标准 Scala(和一般的 FP)名称,where
适合喜欢 SQL.
也和Spark优化有关。看简短的例子: 具有结构和数据的 HDFS 中的大型镶木地板文件:
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet
Found 27 items
drwxr-xr-x - root root 0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1
drwxr-xr-x - root root 0 2019-01-16 13:58 /user/tickers/ticks.parquet/ticker_id=10
drwxr-xr-x - root root 0 2019-01-16 14:04 /user/tickers/ticks.parquet/ticker_id=11
drwxr-xr-x - root root 0 2019-01-16 14:10 /user/tickers/ticks.parquet/ticker_id=12
...
其中每个分区内部都有分区(按日期)
[hadoop@hdpnn ~]$ hadoop fs -ls /user/tickers/ticks.parquet/ticker_id=1
Found 6 items
drwxr-xr-x - root root 0 2019-01-16 12:55 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-09
drwxr-xr-x - root root 0 2019-01-16 12:50 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-10
drwxr-xr-x - root root 0 2019-01-16 12:53 /user/tickers/ticks.parquet/ticker_id=1/ddate=2019-01-11
...
结构:
scala> spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").printSchema
root
|-- ticker_id: integer (nullable = true)
|-- ddate: date (nullable = true)
|-- db_tsunx: long (nullable = true)
|-- ask: double (nullable = true)
|-- bid: double (nullable = true)
例如,您有这样的 DS:
val maxTsunx = spark.read.parquet("hdfs://hdpnn:9000/user/tickers/ticks.parquet").select(col("ticker_id"),col("db_tsunx")).groupBy("ticker_id").agg(max("db_tsunx"))
每个 ticker_id
包含 max(db_tsunx)F.E.: 您只想从此 DS
获取一个代码的数据你有两种方法:
1) maxTsunx.filter(r => r.get(0) == 1)
2) maxTsunx.where(col("ticker_id")===1)
这是一个非常不同的 "Physical Plan"
看看 1)
== Physical Plan ==
*(2) Filter <function1>.apply
+- *(2) HashAggregate(keys=[ticker_id#37], functions=[max(db_tsunx#39L)], output=[ticker_id#37, max(db_tsunx)#52L])
+- Exchange hashpartitioning(ticker_id#37, 200)
+- *(1) HashAggregate(keys=[ticker_id#37], functions=[partial_max(db_tsunx#39L)], output=[ticker_id#37, max#61L])
+- *(1) Project [ticker_id#37, db_tsunx#39L]
+- *(1) FileScan parquet [db_tsunx#39L,ticker_id#37,ddate#38] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet],
PartitionCount: 162,
PartitionFilters: [],
PushedFilters: [],
ReadSchema: struct<db_tsunx:bigint>
2)
== Physical Plan ==
*(2) HashAggregate(keys=[ticker_id#84], functions=[max(db_tsunx#86L)], output=[ticker_id#84, max(db_tsunx)#99L])
+- Exchange hashpartitioning(ticker_id#84, 200)
+- *(1) HashAggregate(keys=[ticker_id#84], functions=[partial_max(db_tsunx#86L)], output=[ticker_id#84, max#109L])
+- *(1) Project [ticker_id#84, db_tsunx#86L]
+- *(1) FileScan parquet [db_tsunx#86L,ticker_id#84,ddate#85] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[hdfs://hdpnn:9000/user/tickers/ticks.parquet],
PartitionCount: 6,
PartitionFilters: [isnotnull(ticker_id#84), (ticker_id#84 = 1)],
PushedFilters: [],
ReadSchema: struct<db_tsunx:bigint>
比较 162 和 6 和 分区过滤器:[], 分区过滤器:[isnotnull(ticker_id#84), (ticker_id#84 = 1)],
这意味着对来自 DS 的数据进行过滤操作并进入 Spark 并用于优化。
就FYI
,
maxTsunx.filter(r => r.get(0) == 1)
maxTsunx.where(col("ticker_id")===1)
或
maxTsunx.filter(col("ticker_id")===1)
第一种情况,传递函数给过滤函数
在第二种情况下,将条件表达式(字符串或列类型)传递给过滤器或 where 函数。
物理计划 2 也可以通过将 where
替换为过滤器功能来实现。