为什么 HBase API 在使用前缀过滤器扫描 table 后返回空集?
Why is the HBase API returning empty set after scanning a table with a prefix filter?
我正在做一个实时管道,将 Spark Streaming 与 HBase 连接起来。为了这个过程,我必须在 HBase table 中执行一个过滤器,特别是前缀过滤器,因为我想匹配键以某个字符串开头的记录。
我过滤的 table 叫做 "hm_notificaciones"。我可以成功连接到 Hbase shell 并从命令行扫描 table。 运行 以下命令:
scan "hm_notificaciones"
我得到以下记录:
行列+单元格
46948854-20180307 column=info_oferta:id_oferta, timestamp=1520459448795, value=123456
46948854-20180312170423 column=info_oferta:id_establecimiento, timestamp=1520892403770, value=9999
46948854-20180312170423 column=info_oferta:id_oferta, timestamp=1520892390858, value=123445
46948854-20180312170536 column=info_oferta:id_establecimiento, timestamp=1520892422044, value=9239
46948854-20180312170536 column=info_oferta:id_oferta, timestamp=1520892435173, value=4432
46948854-20180313110824 column=info_oferta:id_establecimiento, timestamp=1520957374921, value=9990
46948854-20180313110824 column=info_oferta:id_oferta, timestamp=1520957362458, value=12313
我一直在使用 Hbase API 绑定 运行 前缀过滤器。我正在编写一些 Scala 代码来连接到 API 并制作过滤器。以下代码编译并执行,但它 returns 一个空结果:
def scanTable( table_name:String, family: String, search_key: String )= {
val conf: Configuration = HBaseConfiguration.create()
val connection: Connection = ConnectionFactory.createConnection(conf)
// This is a test to verify if I can connect to HBase API.
//This statements work and print all the table names in HBase
val admin = connection.getAdmin
println("Listing all tablenames")
val list_table_names = admin.listTableNames()
list_table_names.foreach(println)
val table: Table = connection.getTable( TableName.valueOf(table_name) )
//val htable = new HTable(conf, tableName)
var colValueMap: Map[String, String] = Map()
var keyColValueMap: Map[String, Map[String, String]] = Map()
val prefix = Bytes.toBytes(search_key)
val scan = new Scan(prefix)
scan.addFamily(Bytes.toBytes(family))
val prefix_filter = new PrefixFilter(prefix)
scan.setFilter(prefix_filter)
val scanner = table.getScanner(scan)
for( row <- scanner){
val content = row.getNoVersionMap
for( entry <- content.entrySet ){
for( sub_entry <- entry.getValue.entrySet){
colValueMap += (Bytes.toString( sub_entry.getKey) -> Bytes.toString(sub_entry.getValue) )
}
keyColValueMap += (Bytes.toString(row.getRow) -> colValueMap )
}
}
//this doesn't execute
for( ( k, v) <- colValueMap) {
printf( "key: %s", "value: %s\n", k, v )
}
//this never executes since scanner is null (or empty)
for (result <- scanner) {
for (cell <- result.rawCells) {
println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength))
}
}
scanner.close
table.close
connection.close
}
我已经尝试了两种 print/get 数据的方法:组成一个 Map 并遍历 ResultScanner。但是,我的过滤器似乎无法正常工作,因为它返回的是 null/empty 集。
您知道是否有其他方法可以在 Hbase 上执行前缀过滤器吗?
我用来测试上述代码的代码如下:
user_key = "46948854-20181303144609"
scanTable("hm_notificaciones", "info_oferta", user_key)
第二个循环,不会进入,因为你在上一步已经迭代了scanner
for (result <- scanner) {
for (cell <- result.rawCells) {
println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength))
}
}
并使用keyColValueMap 进行打印。它对我有用,再次检查你的前缀过滤器。
for( ( k, v) <- colValueMap) {
printf( "key: %s", "value: %s\n", k, v )
}
我正在做一个实时管道,将 Spark Streaming 与 HBase 连接起来。为了这个过程,我必须在 HBase table 中执行一个过滤器,特别是前缀过滤器,因为我想匹配键以某个字符串开头的记录。
我过滤的 table 叫做 "hm_notificaciones"。我可以成功连接到 Hbase shell 并从命令行扫描 table。 运行 以下命令:
scan "hm_notificaciones"
我得到以下记录:
行列+单元格
46948854-20180307 column=info_oferta:id_oferta, timestamp=1520459448795, value=123456
46948854-20180312170423 column=info_oferta:id_establecimiento, timestamp=1520892403770, value=9999
46948854-20180312170423 column=info_oferta:id_oferta, timestamp=1520892390858, value=123445
46948854-20180312170536 column=info_oferta:id_establecimiento, timestamp=1520892422044, value=9239
46948854-20180312170536 column=info_oferta:id_oferta, timestamp=1520892435173, value=4432
46948854-20180313110824 column=info_oferta:id_establecimiento, timestamp=1520957374921, value=9990
46948854-20180313110824 column=info_oferta:id_oferta, timestamp=1520957362458, value=12313
我一直在使用 Hbase API 绑定 运行 前缀过滤器。我正在编写一些 Scala 代码来连接到 API 并制作过滤器。以下代码编译并执行,但它 returns 一个空结果:
def scanTable( table_name:String, family: String, search_key: String )= {
val conf: Configuration = HBaseConfiguration.create()
val connection: Connection = ConnectionFactory.createConnection(conf)
// This is a test to verify if I can connect to HBase API.
//This statements work and print all the table names in HBase
val admin = connection.getAdmin
println("Listing all tablenames")
val list_table_names = admin.listTableNames()
list_table_names.foreach(println)
val table: Table = connection.getTable( TableName.valueOf(table_name) )
//val htable = new HTable(conf, tableName)
var colValueMap: Map[String, String] = Map()
var keyColValueMap: Map[String, Map[String, String]] = Map()
val prefix = Bytes.toBytes(search_key)
val scan = new Scan(prefix)
scan.addFamily(Bytes.toBytes(family))
val prefix_filter = new PrefixFilter(prefix)
scan.setFilter(prefix_filter)
val scanner = table.getScanner(scan)
for( row <- scanner){
val content = row.getNoVersionMap
for( entry <- content.entrySet ){
for( sub_entry <- entry.getValue.entrySet){
colValueMap += (Bytes.toString( sub_entry.getKey) -> Bytes.toString(sub_entry.getValue) )
}
keyColValueMap += (Bytes.toString(row.getRow) -> colValueMap )
}
}
//this doesn't execute
for( ( k, v) <- colValueMap) {
printf( "key: %s", "value: %s\n", k, v )
}
//this never executes since scanner is null (or empty)
for (result <- scanner) {
for (cell <- result.rawCells) {
println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength))
}
}
scanner.close
table.close
connection.close
}
我已经尝试了两种 print/get 数据的方法:组成一个 Map 并遍历 ResultScanner。但是,我的过滤器似乎无法正常工作,因为它返回的是 null/empty 集。
您知道是否有其他方法可以在 Hbase 上执行前缀过滤器吗?
我用来测试上述代码的代码如下:
user_key = "46948854-20181303144609"
scanTable("hm_notificaciones", "info_oferta", user_key)
第二个循环,不会进入,因为你在上一步已经迭代了scanner
for (result <- scanner) {
for (cell <- result.rawCells) {
println("Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength))
}
}
并使用keyColValueMap 进行打印。它对我有用,再次检查你的前缀过滤器。
for( ( k, v) <- colValueMap) {
printf( "key: %s", "value: %s\n", k, v )
}