使用 spark 中的下推查询,如何在 spark-HBASE 中获得并行性(BIGSQL as SQL 引擎)?

With Pushdown query in spark, how to get parallelism in spark-HBASE (BIGSQL as SQL engine)?

在 Spark 中,PushdownQuery 由数据库的 SQL 引擎处理,并根据其结果构建数据框。 因此,spark 查询该查询的结果。

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah )"""

val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.option("numPartitions", 4)
.option("partitionColumn", "COUNTRY_CODE")
.load()

我可以从 spark 中的另一个参考文献 (https://dzone.com/articles/how-apache-spark-makes-your-slow-mysql-queries-10x) 中看到 - mysql,下推查询中的并行性是通过基于参数 numPartitions 和 partitionColumn 触发多个查询来实现的。 这与 sqoop 的分发方式非常相似。 说上面给出的参数 numPartitions = 4 的例子; partitionColumn = COUNTRY_CODE 并且在我们的 table COUNTRY_CODE 值范围内落在 (000,999).

已构建 4 个查询;触发到 DB 并且数据帧是根据这些结果构建的(在这种情况下并行度为 4)。

Q1 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE >= 000 AND COUNTRY_CODE <= 250
Q2 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 250 AND COUNTRY_CODE  <= 500
Q3 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 500 AND COUNTRY_CODE  <= 750
Q4 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 750 AND COUNTRY_CODE  <= 999

我现在的问题是,如何在spark(2.1版)+ hbase(查询引擎-BIGSQL)中使用这种方法实现并行?它现在没有给我并行性。 桥接 spark-hbase 的驱动程序是否需要更新?还是 spark 需要这样做?或者什么样的变化有助于它实现这一目标? 一些方向对我有帮助。谢谢!

为了获得最佳性能,我建议您使用 --num-executors 4 和 --executor-cores 1 开始您的 spark 作业,因为 jdbc 连接是单线程的,每个任务在一个核心上运行询问。通过进行此配置更改,当您的工作 运行 时,您可以观察到并行的任务 运行,这是每个执行程序的核心正在使用中。

改用以下函数:

val connectionProperties: Properties = new Properties
connectionProperties.put("user", "xxxx")
connectionProperties.put("password", "xxxx")
connectionProperties.put("fetchsize", "10000") //fetches 10000 records at once per task
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
connectionProperties

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah ) tbl_alias"""

val dbDataFrame = spark.read.jdbc(url, pushdownQuery, "COUNTRY_CODE", 0L, 4L, 4, connectionProperties)

参考https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,columnName:String,lowerBound:Long,upperBound:Long,numPartitions:Int,connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame