Spark SQL/Hive 查询永远伴随着加入

Spark SQL/Hive Query Takes Forever With Join

所以我正在做一些应该很简单的事情,但显然它不在 Spark 中 SQL。

如果我 运行 在 MySQL 中执行以下查询,查询将在几分之一秒内完成:

SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;

但是,运行在 Spark (1.5.1) 下的 HiveContext 中执行相同的查询需要超过 13 秒。添加更多联接会使查询 运行 持续很长时间(超过 10 分钟)。我不确定我在这里做错了什么以及如何加快速度。

tables 是 MySQL tables 被加载到 Hive 上下文中作为临时 tables.This 是 运行ning 在单个实例中,使用远程计算机上的数据库。

tables 有外键字段,但数据库中没有定义明确的 fk 关系。我正在使用 InnoDB。

Spark中的执行计划:

计划:

Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]

Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]

ConvertToUnsafe ConvertToUnsafe

TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0

SortMergeJoin [user_address_id#27L], [address_id#52L]

== Physical Plan == TungstenProject [address_id#0L]

首先,您执行的查询类型非常低效。至于现在(Spark 1.5.0*)执行这样的连接,每次执行查询时都必须对 tables 进行洗牌/散列分区。在 users table 的情况下应该不是问题,其中 user_id = 123 谓词很可能被下推,但仍然需要在 user_address 上完全洗牌。

此外,如果 table 只被注册而不被缓存,那么每次执行这个查询都会从 MySQL 中获取整个 user_address table 到 Spark。

I'm not sure what I'm doing wrong here and how I can speed things up.

目前还不清楚为什么要将 Spark 用于应用程序,但单机设置、小数据和查询类型表明 Spark 不适合这里。

一般来说,如果应用程序逻辑需要单个记录访问,那么 Spark SQL 将无法正常运行。它是为分析查询而设计的,而不是作为 OLTP 数据库的替代品。

如果单个 table / 数据帧小得多,您可以尝试广播。

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast

val user: DataFrame = ???
val user_address: DataFrame = ???

val userFiltered = user.where(???)

user_addresses.join(
  broadcast(userFiltered), $"address_id" === $"user_address_id")

* 这应该在 Spark 1.6.0 中更改 SPARK-11410 应该启用持久的 table 分区。

我在类似情况下遇到过同样的问题(Spark 1.5.1,PostgreSQL 9.4)。

鉴于两个table喜欢

val t1 = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "t1")).load()

val t2 = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "t2")).load()

然后在注册的临时 tables 上加入 HQL 会导致对 tables 之一进行完整的 table 扫描(在我的例子中是 child).

无论如何,解决方法是将查询推送到底层 RDBMS:

val joined = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "(select t1.*, t2.* from t1 inner join t2 on ...) as t")).load()

这样底层 RDBMS 的查询优化器就会启动,在我的例子中它切换到索引扫描。另一方面,Spark 下推了两个独立的查询,而 RDBMS 无法真正优化这一点。