如何配置 Spark Streaming Scala 应用程序以从 Hadoop + Yarn 上的 HBase 读取
How to configure Spark Streaming Scala app to read from HBase on Hadoop + Yarn
Spark、Hadoop + Yarn 上的 Hbase,我想从使用 SBT 构建的 Scala 应用程序读写 HBase。
我无法创建 HBase
Scala 应用程序:
/usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala
package com.mydomain.spark.hbasewordcount
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
object HBaseScalaWordCount {
def main(args: Array[String]) {
val name = "Example of read from HBase table"
lazy val sparkConf = new SparkConf().setAppName(name)
lazy val ssc = new StreamingContext(sparkConf, Seconds(1))
implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath
val columns = Map(
"cf1" -> Set("col1", "col2"),
"cf2" -> Set("col3")
)
ssc.hbase[String]("testtable", columns)
.map({ case (k, v) =>
val cf1 = v("cf1")
val col1 = cf1("col1")
val col2 = cf1("col2")
val col3 = v("cf2")("col3")
List(k, col1, col2, col3) mkString "\t"
})
.saveAsTextFile("file:/home/hduser/hbasetest-output")
}
}
SBT 文件:
/usr/local/sparkapps/HBaseWordCount/HBaseWordCount.sbt
name := "HBaseScalaWordCount"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
"org.apache.hbase" % "hbase-common" % "1.2.1" % "provided",
"org.apache.hbase" % "hbase-client" % "1.2.1" % "provided",
"org.apache.hbase" % "hbase-server" % "1.2.1" % "provided",
"eu.unicredit" %% "hbase-rdd" % "0.7.1"
)
SBT 套餐
/usr/local/sparkapps/HBaseWordCount$ sbt package
[info] Set current project to HBaseScalaWordCount (in build file:/usr/local/sparkapps/HBaseWordCount/)
[info] Compiling 1 Scala source to /usr/local/sparkapps/HBaseWordCount/target/scala-2.10/classes...
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:29: not found: value HBaseConfig
[error] implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath
[error] ^
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:36: value hbase is not a member of org.apache.spark.streaming.StreamingContext
[error] ssc.hbase[String]("testtable", columns)
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 9 s, completed Apr 14, 2016 4:11:40 PM
HBase 在 Hadoop 上正常工作,但我无法理解如何为 Spark 配置类路径,例如在实际上不存在的 /usr/local/spark/conf/spark-deafaults.conf 中,我只有 spark-deafaults.conf.template
SPARK-ENV.SH:
/usr/local/spark/conf/spark-env.sh
export SPARK_MASTER_IP=localhost
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=800m
export SPARK_WORKER_INSTANCES=1
SPARK-DEFAULTS.CONF:
doesn't exist
HBASE 路径:
/usr/local/hbase/hbase-1.1.3/lib/
HBASE_SITE.XML:
/usr/local/hbase/hbase-1.1.3/conf/hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>localhost</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/hduser/hbase/zookeeper</value>
</property>
</configuration>
首先,SBT找不到classHBaseConf
。这是因为你导入了org.apache.hadoop.hbase.HBaseConfiguration
,但是你需要的class是unicredit.spark.hbase.HBaseConf
.
你的第二个问题是
value hbase is not a member of org.apache.spark.streaming.StreamingContextvalue hbase is not a member of org.apache.spark.streaming.StreamingContext
这意味着 SBT 在 StreamingContext
上找不到 hbase
方法。
我看到您正在使用 hbase-rdd 将 HBase 支持添加到 Spark。
如果您查看该项目的自述文件,则必须为其隐式添加导入行,因此将其添加到 class:
的顶部
import unicredit.spark.hbase._
隐式是对 Scala 的一个很好的补充,它可以扩展其他包的 classes 的功能。使用导入的隐含函数,hbase
方法应该在您的 SparkContext
实例上可用。
请注意,您还没有 SparkContext
实例,只有 StreamingContext
,因此请先创建一个。也没有必要让它们 lazy
.
Spark、Hadoop + Yarn 上的 Hbase,我想从使用 SBT 构建的 Scala 应用程序读写 HBase。
我无法创建 HBase
Scala 应用程序:
/usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala
package com.mydomain.spark.hbasewordcount
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
object HBaseScalaWordCount {
def main(args: Array[String]) {
val name = "Example of read from HBase table"
lazy val sparkConf = new SparkConf().setAppName(name)
lazy val ssc = new StreamingContext(sparkConf, Seconds(1))
implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath
val columns = Map(
"cf1" -> Set("col1", "col2"),
"cf2" -> Set("col3")
)
ssc.hbase[String]("testtable", columns)
.map({ case (k, v) =>
val cf1 = v("cf1")
val col1 = cf1("col1")
val col2 = cf1("col2")
val col3 = v("cf2")("col3")
List(k, col1, col2, col3) mkString "\t"
})
.saveAsTextFile("file:/home/hduser/hbasetest-output")
}
}
SBT 文件:
/usr/local/sparkapps/HBaseWordCount/HBaseWordCount.sbt
name := "HBaseScalaWordCount"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
"org.apache.hbase" % "hbase-common" % "1.2.1" % "provided",
"org.apache.hbase" % "hbase-client" % "1.2.1" % "provided",
"org.apache.hbase" % "hbase-server" % "1.2.1" % "provided",
"eu.unicredit" %% "hbase-rdd" % "0.7.1"
)
SBT 套餐
/usr/local/sparkapps/HBaseWordCount$ sbt package
[info] Set current project to HBaseScalaWordCount (in build file:/usr/local/sparkapps/HBaseWordCount/)
[info] Compiling 1 Scala source to /usr/local/sparkapps/HBaseWordCount/target/scala-2.10/classes...
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:29: not found: value HBaseConfig
[error] implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath
[error] ^
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:36: value hbase is not a member of org.apache.spark.streaming.StreamingContext
[error] ssc.hbase[String]("testtable", columns)
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 9 s, completed Apr 14, 2016 4:11:40 PM
HBase 在 Hadoop 上正常工作,但我无法理解如何为 Spark 配置类路径,例如在实际上不存在的 /usr/local/spark/conf/spark-deafaults.conf 中,我只有 spark-deafaults.conf.template
SPARK-ENV.SH:
/usr/local/spark/conf/spark-env.sh
export SPARK_MASTER_IP=localhost
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=800m
export SPARK_WORKER_INSTANCES=1
SPARK-DEFAULTS.CONF:
doesn't exist
HBASE 路径:
/usr/local/hbase/hbase-1.1.3/lib/
HBASE_SITE.XML:
/usr/local/hbase/hbase-1.1.3/conf/hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>localhost</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/hduser/hbase/zookeeper</value>
</property>
</configuration>
首先,SBT找不到classHBaseConf
。这是因为你导入了org.apache.hadoop.hbase.HBaseConfiguration
,但是你需要的class是unicredit.spark.hbase.HBaseConf
.
你的第二个问题是
value hbase is not a member of org.apache.spark.streaming.StreamingContextvalue hbase is not a member of org.apache.spark.streaming.StreamingContext
这意味着 SBT 在 StreamingContext
上找不到 hbase
方法。
我看到您正在使用 hbase-rdd 将 HBase 支持添加到 Spark。
如果您查看该项目的自述文件,则必须为其隐式添加导入行,因此将其添加到 class:
import unicredit.spark.hbase._
隐式是对 Scala 的一个很好的补充,它可以扩展其他包的 classes 的功能。使用导入的隐含函数,hbase
方法应该在您的 SparkContext
实例上可用。
请注意,您还没有 SparkContext
实例,只有 StreamingContext
,因此请先创建一个。也没有必要让它们 lazy
.