如何通过 Spark 使用 HBase ColumnRangeFilter
How to use HBase ColumnRangeFilter by Spark
正在考虑如何通过Spark使用HBase ColumnRangeFilter
我查看 org.apache.hadoop.hbase.mapreduce.TableInputFormat,但是这个 API 不包含 ColumnRangeFilter。
所以我不知道如何通过 Spark 执行 ColumnRangeFilter。
我想使用以“20170225”开头并以“20170305”结尾的 ColumnRangeFilter。
我发现像代码下面那样扫描行。
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "like_count")
val startRow = "001"
val endRow = "100"
conf.set(TableInputFormat.SCAN_ROW_START, startRow)
conf.set(TableInputFormat.SCAN_ROW_STOP, endRow)
sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
我需要添加什么代码?
如果有人有什么建议,请告诉我。
使用扫描对象设置开始行和结束行,并在 Hbase 配置中设置该扫描对象,然后将该配置对象传递给 tableInputFormat
https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html
Scan scan = new Scan(startRow, endRow);
scan.setMaxVersions(MAX_VERSIONS);
//This can also be done if not specified in scan object constructor
scan.setFilter(new ColumnRangeFilter(startrow,true,endrow,true));
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
conf.set(TableInputFormat.INPUT_TABLE, username + ":" + path);
conf.set(TableInputFormat.SCAN, convertScanToString(scan));
tableInputFormat.setConf(conf);
正在考虑如何通过Spark使用HBase ColumnRangeFilter
我查看 org.apache.hadoop.hbase.mapreduce.TableInputFormat,但是这个 API 不包含 ColumnRangeFilter。
所以我不知道如何通过 Spark 执行 ColumnRangeFilter。
我想使用以“20170225”开头并以“20170305”结尾的 ColumnRangeFilter。
我发现像代码下面那样扫描行。
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "like_count")
val startRow = "001"
val endRow = "100"
conf.set(TableInputFormat.SCAN_ROW_START, startRow)
conf.set(TableInputFormat.SCAN_ROW_STOP, endRow)
sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
我需要添加什么代码? 如果有人有什么建议,请告诉我。
使用扫描对象设置开始行和结束行,并在 Hbase 配置中设置该扫描对象,然后将该配置对象传递给 tableInputFormat https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html
Scan scan = new Scan(startRow, endRow);
scan.setMaxVersions(MAX_VERSIONS);
//This can also be done if not specified in scan object constructor
scan.setFilter(new ColumnRangeFilter(startrow,true,endrow,true));
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
conf.set(TableInputFormat.INPUT_TABLE, username + ":" + path);
conf.set(TableInputFormat.SCAN, convertScanToString(scan));
tableInputFormat.setConf(conf);