如何在 spark 中使用整个配置单元数据库并从外部文件读取 sql 查询?

how to use a whole hive database in spark and read sql queries from external files?

我在 Azure 中使用带有 spark 1.6 的 hortonworks 沙箱。 我有一个填充了 TPC-DS 示例数据的 Hive 数据库。我想从外部文件中读取一些 SQL 查询,并在 spark.hive 数据集上读取 运行 它们。 我遵循这个主题 Using hive database in spark,它只是在我的数据集中使用 table 并且它再次在 spark 中写入 SQL 查询,但我需要将整个数据集定义为我的查询源那,我想我应该使用数据框,但我不确定也不知道如何使用! 我还想从外部 .sql 文件导入 SQL 查询,不要再写下查询! 你能指导我怎么做吗? 非常感谢你, 最佳!

Spark 可以直接从 Hive 读取数据table。您可以使用 Spark 创建、删除 Hive table,甚至可以通过 Spark 执行所有与 Hive hql 相关的操作。为此,您需要使用 Spark HiveContext

来自 Spark 文档:

Spark HiveContext,提供了基本 SQLContext 提供的功能的超集。其他功能包括使用更完整的 HiveQL 解析器编写查询的能力、访问 Hive UDF 以及从 Hive tables 读取数据的能力。要使用 HiveContext,您不需要现有的 Hive 设置。

更多信息请访问Spark Documentation

为了避免在代码中编写 sql,您可以使用 属性 文件,您可以在其中放置所有 Hive 查询,然后您可以在代码中使用密钥。

下面请看Spark HiveContext的实现和Spark Scala中属性文件的使用。

package com.spark.hive.poc

import org.apache.spark._
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.DataFrame;
import org.apache.spark.rdd.RDD;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.hive.HiveContext;

//Import Row.
import org.apache.spark.sql.Row;
//Import Spark SQL data types
import org.apache.spark.sql.types.{ StructType, StructField, StringType };

object ReadPropertyFiles extends Serializable {

  val conf = new SparkConf().setAppName("read local file");

  conf.set("spark.executor.memory", "100M");
  conf.setMaster("local");

  val sc = new SparkContext(conf)
  val sqlContext = new HiveContext(sc)

  def main(args: Array[String]): Unit = {

    var hadoopConf = new org.apache.hadoop.conf.Configuration();
    var fileSystem = FileSystem.get(hadoopConf);
    var Path = new Path(args(0));
    val inputStream = fileSystem.open(Path);
    var Properties = new java.util.Properties;
    Properties.load(inputStream);

    //Create an RDD
    val people = sc.textFile("/user/User1/spark_hive_poc/input/");
    //The schema is encoded in a string
    val schemaString = "name address";

    //Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)));

    //Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim));
    //Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
    peopleDataFrame.printSchema();

    peopleDataFrame.registerTempTable("tbl_temp")

    val data = sqlContext.sql(Properties.getProperty("temp_table"));

    //Drop Hive table
    sqlContext.sql(Properties.getProperty("drop_hive_table"));
    //Create Hive table
    sqlContext.sql(Properties.getProperty("create_hive_tavle"));
    //Insert data into Hive table
    sqlContext.sql(Properties.getProperty("insert_into_hive_table"));
    //Select Data into Hive table
    sqlContext.sql(Properties.getProperty("select_from_hive")).show();

    sc.stop

  }
}

属性文件中的条目:

temp_table=select * from tbl_temp
drop_hive_table=DROP TABLE IF EXISTS default.test_hive_tbl
create_hive_tavle=CREATE TABLE IF NOT EXISTS default.test_hive_tbl(name string, city string) STORED AS ORC
insert_into_hive_table=insert overwrite table default.test_hive_tbl select * from tbl_temp
select_from_hive=select * from default.test_hive_tbl

Spark 将命令提交给 运行 此作业:

[User1@hadoopdev ~]$ spark-submit --num-executors 1 \
--executor-memory 100M --total-executor-cores 2 --master local \
--class com.spark.hive.poc.ReadPropertyFiles Hive-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
/user/User1/spark_hive_poc/properties/sql.properties

注意: 属性 文件位置应为 HDFS 位置。