基于部分HBase行创建RDD
Create RDD based on a part of HBase rows
我正在尝试根据来自 HBase
table:
的数据创建 RDD
val targetRDD = sparkContext.newAPIHadoopRDD(hBaseConfig,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
.map {
case (key, row) => parse(key, row)
}
parse
为每个 table 行调用,不考虑对数据的进一步操作。
是否可以仅检索具有匹配某些条件(即键在某些特定范围内)的特定键的行,以便仅对它们进行操作?
HBase 是一个 key/value 存储,其中的行按键排序,这意味着:
- 它可以高效地通过键检索单行或通过键范围检索行序列
- 根据某些条件检索随机行效率不高
所有检索操作归结为两个 类:Get and Scan。这不难猜出他们做了什么,扫描将遍历所有行,除非你指定 stopRow/startRow。您还可以在 Scan 上设置过滤器,但它仍然必须迭代所有行,过滤器只是可以降低网络压力,因为 HBase 可能需要 return 更少的行。
TableInputFormat 在您的示例中使用 Scan inside 来访问 Hbase:
public void setConf(Configuration configuration) {
this.conf = configuration;
Scan scan = null;
if (conf.get(SCAN) != null) {
try {
scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
} catch (IOException e) {
LOG.error("An error occurred.", e);
}
} else {
try {
scan = createScanFromConfiguration(conf);
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
}
setScan(scan);
}
此外,createScanFromConfiguration 方法 TableInputFormat 可以提示您如何设置过滤器和键范围:
public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
Scan scan = new Scan();
if (conf.get(SCAN_ROW_START) != null) {
scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
}
if (conf.get(SCAN_ROW_STOP) != null) {
scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
}
if (conf.get(SCAN_COLUMNS) != null) {
addColumns(scan, conf.get(SCAN_COLUMNS));
}
if (conf.get(SCAN_COLUMN_FAMILY) != null) {
scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
}
if (conf.get(SCAN_TIMESTAMP) != null) {
scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
}
if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
scan.setTimeRange(
Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
}
if (conf.get(SCAN_MAXVERSIONS) != null) {
scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
}
if (conf.get(SCAN_CACHEDROWS) != null) {
scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
}
if (conf.get(SCAN_BATCHSIZE) != null) {
scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
}
// false by default, full table scans generate too much BC churn
scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
return scan;
}
这个 Whosebug answer 提供了一个如何在 hbaseConfig
上设置扫描的示例,请注意,虽然您不必设置扫描,但您可以只设置特定的属性,例如 SCAN_ROW_START 和其他来自 createScanFromConfiguration
我上面提到的。
我正在尝试根据来自 HBase
table:
RDD
val targetRDD = sparkContext.newAPIHadoopRDD(hBaseConfig,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
.map {
case (key, row) => parse(key, row)
}
parse
为每个 table 行调用,不考虑对数据的进一步操作。
是否可以仅检索具有匹配某些条件(即键在某些特定范围内)的特定键的行,以便仅对它们进行操作?
HBase 是一个 key/value 存储,其中的行按键排序,这意味着:
- 它可以高效地通过键检索单行或通过键范围检索行序列
- 根据某些条件检索随机行效率不高
所有检索操作归结为两个 类:Get and Scan。这不难猜出他们做了什么,扫描将遍历所有行,除非你指定 stopRow/startRow。您还可以在 Scan 上设置过滤器,但它仍然必须迭代所有行,过滤器只是可以降低网络压力,因为 HBase 可能需要 return 更少的行。
TableInputFormat 在您的示例中使用 Scan inside 来访问 Hbase:
public void setConf(Configuration configuration) {
this.conf = configuration;
Scan scan = null;
if (conf.get(SCAN) != null) {
try {
scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
} catch (IOException e) {
LOG.error("An error occurred.", e);
}
} else {
try {
scan = createScanFromConfiguration(conf);
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
}
setScan(scan);
}
此外,createScanFromConfiguration 方法 TableInputFormat 可以提示您如何设置过滤器和键范围:
public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
Scan scan = new Scan();
if (conf.get(SCAN_ROW_START) != null) {
scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
}
if (conf.get(SCAN_ROW_STOP) != null) {
scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
}
if (conf.get(SCAN_COLUMNS) != null) {
addColumns(scan, conf.get(SCAN_COLUMNS));
}
if (conf.get(SCAN_COLUMN_FAMILY) != null) {
scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
}
if (conf.get(SCAN_TIMESTAMP) != null) {
scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
}
if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
scan.setTimeRange(
Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
}
if (conf.get(SCAN_MAXVERSIONS) != null) {
scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
}
if (conf.get(SCAN_CACHEDROWS) != null) {
scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
}
if (conf.get(SCAN_BATCHSIZE) != null) {
scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
}
// false by default, full table scans generate too much BC churn
scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
return scan;
}
这个 Whosebug answer 提供了一个如何在 hbaseConfig
上设置扫描的示例,请注意,虽然您不必设置扫描,但您可以只设置特定的属性,例如 SCAN_ROW_START 和其他来自 createScanFromConfiguration
我上面提到的。