在 Spark Streaming 中读取 Hbase 数据
Read Hbase data in Spark Streaming
我正在写一个项目来从Kafka接收数据并写入Hbase table。因为我想知道记录的差异,我需要先在 Hbase 中获取具有相同 rowkey 的记录,然后对接收到的记录进行减法,最后将新记录保存到 HBase table.
一开始,我尝试使用newAPIHadoop
从hbase获取数据。这是我的尝试:
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.master", masterAddr)
conf.set("hbase.zookeeper.property.clientPort", portNum)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName)
val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
通过这种方式,我能够获取具有特定列族和列名的记录的值ONLY ONCE。只说一次,我的意思是每次我启动我的spark-streaming应用程序时,这段代码都会被执行,我可以得到一个值,但它不会再执行了。因为我想在每次收到来自 Kafka 的记录时使用 cf 和列从 HBase 读取我的记录,这对我不起作用。
为了解决这个问题,我将逻辑移至foreachRDD()
,但不幸的是sparkContext 似乎不可序列化。我收到类似 task is not serialzable
.
的错误
最后,我发现还有一种方法可以使用hbase.clinet HTable从hbase中读取数据。所以这是我的最终作品:
def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = {
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set("hbase.zookeeper.quorum", "xxxxxx")
conf.set("hbase.master", "xxxx")
conf.set("hbase.zookeeper.property.clientPort", "xxx")
conf.set(TableInputFormat.INPUT_TABLE, "xx")
conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx")
val testTable = new HTable(conf, "testTable")
val scan = new Scan
scan.addColumn("cf1".getBytes, "test".getBytes)
val rs = testTable.getScanner(scan)
var r = rs.next()
val res = new StringBuilder
while(r != null){
val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes))
res.append(tmp)
r= rs.next()
}
val res = res.toString
//do the following manipulations and return object (ImmutableBytesWritable, Put)
..............................
.......................
}
在主要方法中,我在 foreachRDD 中使用上述方法,并使用方法 saveAsNewAPIHadoopDataset
保存到 HBase 中
streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration))
现在这对我来说很好用,但我对这个过程有疑问:
这样,我猜,对于 RDD 的每个分区,都会创建一个到 HBase 的连接。我想知道是否可以扩展我的应用程序。假设我在 1 秒内有超过 1000 条记录,看起来我的 spark Streaming 中将设置 1000 个连接。
这是从 hbase 读取数据的正确方法吗?在 sparkStreaming 中从 HBase 读取数据的最佳实践是什么?或者 spark streaming 不应该读取任何数据,它只是设计用于将流数据写入 DB。
提前致谢。
foreachRDD 在各个执行器 jvm 进程上执行。至少你可以在 transferToHBasePut 方法中获得 conf 的单例实例(意味着在使用 jvm 进程的现有设置 conf 或新的 conf 之前进行空检查)。因此,这会将 Hbase 连接数减少到 Spark 集群中生成的执行程序数。
希望这对您有所帮助...
经过一番学习,我为RDD的每个分区创建了一个配置。在 Spark Streaming official website 检查 foreachRDD
的设计模式。实际上Configuration不是一个连接,所以我不知道如何从现有的连接池中获取连接来为Hbase获取和放置记录。
我正在写一个项目来从Kafka接收数据并写入Hbase table。因为我想知道记录的差异,我需要先在 Hbase 中获取具有相同 rowkey 的记录,然后对接收到的记录进行减法,最后将新记录保存到 HBase table.
一开始,我尝试使用newAPIHadoop
从hbase获取数据。这是我的尝试:
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.master", masterAddr)
conf.set("hbase.zookeeper.property.clientPort", portNum)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName)
val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
通过这种方式,我能够获取具有特定列族和列名的记录的值ONLY ONCE。只说一次,我的意思是每次我启动我的spark-streaming应用程序时,这段代码都会被执行,我可以得到一个值,但它不会再执行了。因为我想在每次收到来自 Kafka 的记录时使用 cf 和列从 HBase 读取我的记录,这对我不起作用。
为了解决这个问题,我将逻辑移至foreachRDD()
,但不幸的是sparkContext 似乎不可序列化。我收到类似 task is not serialzable
.
最后,我发现还有一种方法可以使用hbase.clinet HTable从hbase中读取数据。所以这是我的最终作品:
def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = {
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set("hbase.zookeeper.quorum", "xxxxxx")
conf.set("hbase.master", "xxxx")
conf.set("hbase.zookeeper.property.clientPort", "xxx")
conf.set(TableInputFormat.INPUT_TABLE, "xx")
conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx")
val testTable = new HTable(conf, "testTable")
val scan = new Scan
scan.addColumn("cf1".getBytes, "test".getBytes)
val rs = testTable.getScanner(scan)
var r = rs.next()
val res = new StringBuilder
while(r != null){
val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes))
res.append(tmp)
r= rs.next()
}
val res = res.toString
//do the following manipulations and return object (ImmutableBytesWritable, Put)
..............................
.......................
}
在主要方法中,我在 foreachRDD 中使用上述方法,并使用方法 saveAsNewAPIHadoopDataset
streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration))
现在这对我来说很好用,但我对这个过程有疑问:
这样,我猜,对于 RDD 的每个分区,都会创建一个到 HBase 的连接。我想知道是否可以扩展我的应用程序。假设我在 1 秒内有超过 1000 条记录,看起来我的 spark Streaming 中将设置 1000 个连接。
这是从 hbase 读取数据的正确方法吗?在 sparkStreaming 中从 HBase 读取数据的最佳实践是什么?或者 spark streaming 不应该读取任何数据,它只是设计用于将流数据写入 DB。
提前致谢。
foreachRDD 在各个执行器 jvm 进程上执行。至少你可以在 transferToHBasePut 方法中获得 conf 的单例实例(意味着在使用 jvm 进程的现有设置 conf 或新的 conf 之前进行空检查)。因此,这会将 Hbase 连接数减少到 Spark 集群中生成的执行程序数。
希望这对您有所帮助...
经过一番学习,我为RDD的每个分区创建了一个配置。在 Spark Streaming official website 检查 foreachRDD
的设计模式。实际上Configuration不是一个连接,所以我不知道如何从现有的连接池中获取连接来为Hbase获取和放置记录。