Spark Stream Kafka 和 Hbase 配置
Spark Stream Kafka and Hbase Config
我对 Kafka 和 HBase 的 Spark Streaming 有一些疑问。
下面是我的 spark streaming 程序,这里我使用 zookeeper 配置连接到 Kafka 和 Hbase。
我们真的需要在流代码中进行这种配置吗?或者我做错了
如果我使用 Hortonworks 或 Cloudera 等 hadoop 发行版,应该有配置 spark 与 kafka 和 Hbase 的规定,这样我的 spark 流代码应该只采用 kafka 主题和 Hbase table 没有动物园管理员和其他配置的参数.如果这可以做到,请你帮我完成这些步骤。
object KafkaSparkStream{
def main(args: Array[String]): Unit =
{
var arg = Array("10.74.163.163:9092,10.74.163.154:9092", "10.74.163.154:2181", "test_topic")
val Array(broker, zk, topic) = arg
val conf = new SparkConf()
.setAppName("KafkaSparkStreamToHbase")
.setMaster("local[2]");
//.setMaster("yarn-client")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaConf = Map("metadata.broker.list" -> broker,
"zookeeper.connect" -> zk,
"group.id" -> "kafka-spark-streaming-example",
"zookeeper.connection.timeout.ms" -> "1000")
/* Kafka integration with reciever */
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaConf, Map(topic -> 1),
StorageLevel.MEMORY_ONLY_SER).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.foreachRDD(rdd => {
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
conf.set("hbase.zookeeper.quorum", "10.74.163.154:2181")
conf.set("hbase.master", "HOSTNAME:16000");
conf.set("hbase.rootdir", "file:///tmp/hbase")
val jobConf = new Configuration(conf)
jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName)
jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName)
jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName)
//rdd.saveAsNewAPIHadoopDataset(jobConf)
rdd.map(convert).saveAsNewAPIHadoopDataset(jobConf)
})
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
使用 HBase 的方法是将 hbase-site.xml 配置文件添加到 Spark 类路径。
对于 kafka,您可以使用 https://github.com/typesafehub/config 从自定义配置文件加载属性。
为了使用此配置文件,您必须:
- 设置
--driver-class-path <dir with the config file>
- 设置
--files <configuration file>
将此文件复制到每个执行者的工作目录
- 设置
spark.executor.extraClassPath=./
将每个执行器的工作目录添加到其类路径
我对 Kafka 和 HBase 的 Spark Streaming 有一些疑问。 下面是我的 spark streaming 程序,这里我使用 zookeeper 配置连接到 Kafka 和 Hbase。 我们真的需要在流代码中进行这种配置吗?或者我做错了 如果我使用 Hortonworks 或 Cloudera 等 hadoop 发行版,应该有配置 spark 与 kafka 和 Hbase 的规定,这样我的 spark 流代码应该只采用 kafka 主题和 Hbase table 没有动物园管理员和其他配置的参数.如果这可以做到,请你帮我完成这些步骤。
object KafkaSparkStream{
def main(args: Array[String]): Unit =
{
var arg = Array("10.74.163.163:9092,10.74.163.154:9092", "10.74.163.154:2181", "test_topic")
val Array(broker, zk, topic) = arg
val conf = new SparkConf()
.setAppName("KafkaSparkStreamToHbase")
.setMaster("local[2]");
//.setMaster("yarn-client")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaConf = Map("metadata.broker.list" -> broker,
"zookeeper.connect" -> zk,
"group.id" -> "kafka-spark-streaming-example",
"zookeeper.connection.timeout.ms" -> "1000")
/* Kafka integration with reciever */
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaConf, Map(topic -> 1),
StorageLevel.MEMORY_ONLY_SER).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.foreachRDD(rdd => {
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
conf.set("hbase.zookeeper.quorum", "10.74.163.154:2181")
conf.set("hbase.master", "HOSTNAME:16000");
conf.set("hbase.rootdir", "file:///tmp/hbase")
val jobConf = new Configuration(conf)
jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName)
jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName)
jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName)
//rdd.saveAsNewAPIHadoopDataset(jobConf)
rdd.map(convert).saveAsNewAPIHadoopDataset(jobConf)
})
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
使用 HBase 的方法是将 hbase-site.xml 配置文件添加到 Spark 类路径。
对于 kafka,您可以使用 https://github.com/typesafehub/config 从自定义配置文件加载属性。 为了使用此配置文件,您必须:
- 设置
--driver-class-path <dir with the config file>
- 设置
--files <configuration file>
将此文件复制到每个执行者的工作目录 - 设置
spark.executor.extraClassPath=./
将每个执行器的工作目录添加到其类路径