Spark Stream Kafka 和 Hbase 配置

Spark Stream Kafka and Hbase Config

我对 Kafka 和 HBase 的 Spark Streaming 有一些疑问。 下面是我的 spark streaming 程序,这里我使用 zookeeper 配置连接到 Kafka 和 Hbase。 我们真的需要在流代码中进行这种配置吗?或者我做错了 如果我使用 Hortonworks 或 Cloudera 等 hadoop 发行版,应该有配置 spark 与 kafka 和 Hbase 的规定,这样我的 spark 流代码应该只采用 kafka 主题和 Hbase table 没有动物园管理员和其他配置的参数.如果这可以做到,请你帮我完成这些步骤。

object KafkaSparkStream{
  def main(args: Array[String]): Unit =
    {
      var arg = Array("10.74.163.163:9092,10.74.163.154:9092", "10.74.163.154:2181", "test_topic")
      val Array(broker, zk, topic) = arg
      val conf = new SparkConf()
        .setAppName("KafkaSparkStreamToHbase")
        .setMaster("local[2]");
      //.setMaster("yarn-client")
      val ssc = new StreamingContext(conf, Seconds(5))
      val kafkaConf = Map("metadata.broker.list" -> broker,
        "zookeeper.connect" -> zk,
        "group.id" -> "kafka-spark-streaming-example",
        "zookeeper.connection.timeout.ms" -> "1000")
      /* Kafka integration with reciever */
      val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
        ssc, kafkaConf, Map(topic -> 1),
        StorageLevel.MEMORY_ONLY_SER).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
      wordCounts.foreachRDD(rdd => {
        val conf = HBaseConfiguration.create()
        conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
        conf.set("hbase.zookeeper.quorum", "10.74.163.154:2181")
        conf.set("hbase.master", "HOSTNAME:16000");
        conf.set("hbase.rootdir", "file:///tmp/hbase")
        val jobConf = new Configuration(conf)
        jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName)
        jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName)
        jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName)
        //rdd.saveAsNewAPIHadoopDataset(jobConf)
        rdd.map(convert).saveAsNewAPIHadoopDataset(jobConf)
      })
      wordCounts.print()
      ssc.start()
      ssc.awaitTermination()
    }

使用 HBase 的方法是将 hbase-site.xml 配置文件添加到 Spark 类路径。

对于 kafka,您可以使用 https://github.com/typesafehub/config 从自定义配置文件加载属性。 为了使用此配置文件,您必须:

  • 设置--driver-class-path <dir with the config file>
  • 设置--files <configuration file>将此文件复制到每个执行者的工作目录
  • 设置spark.executor.extraClassPath=./将每个执行器的工作目录添加到其类路径