如何使用 TableMapReduceUtil 运行 对 hbase 扫描器结果进行 mapreduce
How to run mapreduce on hbase scanner result with TableMapReduceUtil
我的 hbase table 看起来像这样:
key---------value
id1/bla value1
id1/blabla value2
id2/bla value3
id2/blabla value4
....
id1开头的key有百万个,id2开头的key有百万个。
我想用mapReduce从hbase读取数据,因为key很多以相同的 ID 开头并且每个 ID 一张地图是不够的。我更喜欢每个 Id
100 个映射器
我希望超过 1 个映射器将 运行 在已按 id 过滤的同一个 scannerResult 上。
我阅读了 TableMapReduceUtil 并尝试了以下操作:
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
使用看起来像这样的地图功能(它应该迭代扫描仪结果):
public static class MyMapper extends TableMapper<Text, IntWritable> {
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
text.set("123"); // we can only emit Writables...
context.write(text, ONE);
}
}
<br>
我的问题是:
- 地图函数怎么可能得到输入结果而不是 ResultScanner?我知道扫描的结果可以由 ResultScanner 迭代,而 ResultScanner 可以由 Result 迭代。 ResultScanner 有 list\array 个结果,不是吗?
- 如何在 map 函数中迭代扫描仪的结果?
- 我如何控制此函数的拆分数量 do.If 它只打开 10 个映射器,我想要 20 个 是否可以更改某些内容?
- 有没有最简单的方法可以实现我的目标?
我将从您列表中的第 4 位开始:
默认行为是为每个区域创建一个映射器。因此,与其试图破解 TableInputFormat
以根据您的规范创建自定义输入拆分,您应该首先考虑将数据拆分为 100 个区域(然后您将拥有 100 个相当平衡的映射器)。
这种方法提高了你的读写性能,因为你不太容易受到热点的影响(假设你的集群中有一个或两个以上的区域服务器)。
解决此问题的首选方法是预先拆分 table(即在 table 创建时定义拆分)。
我的 hbase table 看起来像这样:
key---------value
id1/bla value1
id1/blabla value2
id2/bla value3
id2/blabla value4
....
id1开头的key有百万个,id2开头的key有百万个。
我想用mapReduce从hbase读取数据,因为key很多以相同的 ID 开头并且每个 ID 一张地图是不够的。我更喜欢每个 Id
100 个映射器
我希望超过 1 个映射器将 运行 在已按 id 过滤的同一个 scannerResult 上。
我阅读了 TableMapReduceUtil 并尝试了以下操作:
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
使用看起来像这样的地图功能(它应该迭代扫描仪结果):
public static class MyMapper extends TableMapper<Text, IntWritable> {
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
text.set("123"); // we can only emit Writables...
context.write(text, ONE);
}
}
<br>
我的问题是:
- 地图函数怎么可能得到输入结果而不是 ResultScanner?我知道扫描的结果可以由 ResultScanner 迭代,而 ResultScanner 可以由 Result 迭代。 ResultScanner 有 list\array 个结果,不是吗?
- 如何在 map 函数中迭代扫描仪的结果?
- 我如何控制此函数的拆分数量 do.If 它只打开 10 个映射器,我想要 20 个 是否可以更改某些内容?
- 有没有最简单的方法可以实现我的目标?
我将从您列表中的第 4 位开始:
默认行为是为每个区域创建一个映射器。因此,与其试图破解 TableInputFormat
以根据您的规范创建自定义输入拆分,您应该首先考虑将数据拆分为 100 个区域(然后您将拥有 100 个相当平衡的映射器)。
这种方法提高了你的读写性能,因为你不太容易受到热点的影响(假设你的集群中有一个或两个以上的区域服务器)。
解决此问题的首选方法是预先拆分 table(即在 table 创建时定义拆分)。