Spark SQL/Hive 查询永远伴随着加入
Spark SQL/Hive Query Takes Forever With Join
所以我正在做一些应该很简单的事情,但显然它不在 Spark 中 SQL。
如果我 运行 在 MySQL 中执行以下查询,查询将在几分之一秒内完成:
SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;
但是,运行在 Spark (1.5.1) 下的 HiveContext 中执行相同的查询需要超过 13 秒。添加更多联接会使查询 运行 持续很长时间(超过 10 分钟)。我不确定我在这里做错了什么以及如何加快速度。
tables 是 MySQL tables 被加载到 Hive 上下文中作为临时 tables.This 是 运行ning 在单个实例中,使用远程计算机上的数据库。
- 用户 table 有大约 480 万行。
- user_address table 有 350,000 行。
tables 有外键字段,但数据库中没有定义明确的 fk 关系。我正在使用 InnoDB。
Spark中的执行计划:
计划:
Scan
JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc,
{user=, password=, url=jdbc:mysql://, dbtable=user})
[address_id#0L,user_address_id#27L]
Filter (user_id#0L = 123) Scan
JDBCRelation(jdbc:mysql://.user_address,
[Lorg.apache.spark.Partition;@2ce558f3,{user=, password=,
url=jdbc:mysql://, dbtable=user_address})[address_id#52L]
ConvertToUnsafe ConvertToUnsafe
TungstenExchange hashpartitioning(address_id#52L) TungstenExchange
hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L
ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0
SortMergeJoin [user_address_id#27L], [address_id#52L]
== Physical Plan == TungstenProject [address_id#0L]
首先,您执行的查询类型非常低效。至于现在(Spark 1.5.0*)执行这样的连接,每次执行查询时都必须对 tables 进行洗牌/散列分区。在 users
table 的情况下应该不是问题,其中 user_id = 123
谓词很可能被下推,但仍然需要在 user_address
上完全洗牌。
此外,如果 table 只被注册而不被缓存,那么每次执行这个查询都会从 MySQL 中获取整个 user_address
table 到 Spark。
I'm not sure what I'm doing wrong here and how I can speed things up.
目前还不清楚为什么要将 Spark 用于应用程序,但单机设置、小数据和查询类型表明 Spark 不适合这里。
一般来说,如果应用程序逻辑需要单个记录访问,那么 Spark SQL 将无法正常运行。它是为分析查询而设计的,而不是作为 OLTP 数据库的替代品。
如果单个 table / 数据帧小得多,您可以尝试广播。
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast
val user: DataFrame = ???
val user_address: DataFrame = ???
val userFiltered = user.where(???)
user_addresses.join(
broadcast(userFiltered), $"address_id" === $"user_address_id")
* 这应该在 Spark 1.6.0 中更改 SPARK-11410 应该启用持久的 table 分区。
我在类似情况下遇到过同样的问题(Spark 1.5.1,PostgreSQL 9.4)。
鉴于两个table喜欢
val t1 = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:db",
"dbtable" -> "t1")).load()
val t2 = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:db",
"dbtable" -> "t2")).load()
然后在注册的临时 tables 上加入 HQL 会导致对 tables 之一进行完整的 table 扫描(在我的例子中是 child).
无论如何,解决方法是将查询推送到底层 RDBMS:
val joined = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:db",
"dbtable" -> "(select t1.*, t2.* from t1 inner join t2 on ...) as t")).load()
这样底层 RDBMS 的查询优化器就会启动,在我的例子中它切换到索引扫描。另一方面,Spark 下推了两个独立的查询,而 RDBMS 无法真正优化这一点。
所以我正在做一些应该很简单的事情,但显然它不在 Spark 中 SQL。
如果我 运行 在 MySQL 中执行以下查询,查询将在几分之一秒内完成:
SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;
但是,运行在 Spark (1.5.1) 下的 HiveContext 中执行相同的查询需要超过 13 秒。添加更多联接会使查询 运行 持续很长时间(超过 10 分钟)。我不确定我在这里做错了什么以及如何加快速度。
tables 是 MySQL tables 被加载到 Hive 上下文中作为临时 tables.This 是 运行ning 在单个实例中,使用远程计算机上的数据库。
- 用户 table 有大约 480 万行。
- user_address table 有 350,000 行。
tables 有外键字段,但数据库中没有定义明确的 fk 关系。我正在使用 InnoDB。
Spark中的执行计划:
计划:
Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]
Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]
ConvertToUnsafe ConvertToUnsafe
TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0
SortMergeJoin [user_address_id#27L], [address_id#52L]
== Physical Plan == TungstenProject [address_id#0L]
首先,您执行的查询类型非常低效。至于现在(Spark 1.5.0*)执行这样的连接,每次执行查询时都必须对 tables 进行洗牌/散列分区。在 users
table 的情况下应该不是问题,其中 user_id = 123
谓词很可能被下推,但仍然需要在 user_address
上完全洗牌。
此外,如果 table 只被注册而不被缓存,那么每次执行这个查询都会从 MySQL 中获取整个 user_address
table 到 Spark。
I'm not sure what I'm doing wrong here and how I can speed things up.
目前还不清楚为什么要将 Spark 用于应用程序,但单机设置、小数据和查询类型表明 Spark 不适合这里。
一般来说,如果应用程序逻辑需要单个记录访问,那么 Spark SQL 将无法正常运行。它是为分析查询而设计的,而不是作为 OLTP 数据库的替代品。
如果单个 table / 数据帧小得多,您可以尝试广播。
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast
val user: DataFrame = ???
val user_address: DataFrame = ???
val userFiltered = user.where(???)
user_addresses.join(
broadcast(userFiltered), $"address_id" === $"user_address_id")
* 这应该在 Spark 1.6.0 中更改 SPARK-11410 应该启用持久的 table 分区。
我在类似情况下遇到过同样的问题(Spark 1.5.1,PostgreSQL 9.4)。
鉴于两个table喜欢
val t1 = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:db",
"dbtable" -> "t1")).load()
val t2 = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:db",
"dbtable" -> "t2")).load()
然后在注册的临时 tables 上加入 HQL 会导致对 tables 之一进行完整的 table 扫描(在我的例子中是 child).
无论如何,解决方法是将查询推送到底层 RDBMS:
val joined = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:db",
"dbtable" -> "(select t1.*, t2.* from t1 inner join t2 on ...) as t")).load()
这样底层 RDBMS 的查询优化器就会启动,在我的例子中它切换到索引扫描。另一方面,Spark 下推了两个独立的查询,而 RDBMS 无法真正优化这一点。