MapReduce 扫描 HBase 时,Reducer 的个数始终为一个
during HBase scan with MapReduce, the number of Reducer is always one
我在Mapper中做HBase扫描,然后Reducer将结果写入HDFS。
mapper输出的记录数大概是1,000,000,000条。
问题是减速器的数量总是一个,虽然我设置了-Dmapred.reduce.tasks=100
。 reduce过程很慢。
// 祝方泽编辑于2016-12-04
我的主要代码 class:
public class GetUrlNotSent2SpiderFromHbase extends Configured implements Tool {
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, conf.get("mapred.job.name"));
String input_table = conf.get("input.table");
job.setJarByClass(GetUrlNotSent2SpiderFromHbase.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sitemap_type"));
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("is_send_to_spider"));
TableMapReduceUtil.initTableMapperJob(
input_table,
scan,
GetUrlNotSent2SpiderFromHbaseMapper.class,
Text.class,
Text.class,
job);
/*job.setMapperClass(GetUrlNotSent2SpiderFromHbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);*/
job.setReducerClass(GetUrlNotSent2SpiderFromHbaseReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
if (job.waitForCompletion(true) && job.isSuccessful()) {
return 0;
}
return -1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int res = ToolRunner.run(conf, new GetUrlNotSent2SpiderFromHbase(), args);
System.exit(res);
}
}
这是 运行 此 MapReduce 作业的脚本:
table="xxx"
output="yyy"
sitemap_type="zzz"
JOBCONF=""
JOBCONF="${JOBCONF} -Dmapred.job.name=test_for_scan_hbase"
JOBCONF="${JOBCONF} -Dinput.table=$table"
JOBCONF="${JOBCONF} -Dmapred.output.dir=$output"
JOBCONF="${JOBCONF} -Ddemand.sitemap.type=$sitemap_type"
JOBCONF="${JOBCONF} -Dyarn.app.mapreduce.am.command-opts='-Xmx8192m'"
JOBCONF="${JOBCONF} -Dyarn.app.mapreduce.am.resource.mb=9216"
JOBCONF="${JOBCONF} -Dmapreduce.map.java.opts='-Xmx1536m'"
JOBCONF="${JOBCONF} -Dmapreduce.map.memory.mb=2048"
JOBCONF="${JOBCONF} -Dmapreduce.reduce.java.opts='-Xmx1536m'"
JOBCONF="${JOBCONF} -Dmapreduce.reduce.memory.mb=2048"
JOBCONF="${JOBCONF} -Dmapred.reduce.tasks=100"
JOBCONF="${JOBCONF} -Dmapred.job.priority=VERY_HIGH"
hadoop fs -rmr $output
hadoop jar get_url_not_sent_2_spider_from_hbase_hourly.jar hourly.GetUrlNotSent2SpiderFromHbase $JOBCONF
echo "===== scan HBase finished ====="
我在代码中设置了job.setNumReduceTasks(100);
,它起作用了。
既然你提到只有一个 reduce 在工作,这就是 reducer 非常慢的明显原因。
了解应用于作业的配置属性的统一方式(您为执行的每个作业调用此方法以了解参数是否正确传递):
将以下方法添加到上面提到的作业驱动程序中,以打印从所有可能来源应用的配置条目,即来自 -D 或其他地方请在提交作业之前在驱动程序中添加此方法调用:
public static void printConfigApplied(Configuration conf)
try {
conf.writeXml(System.out);
} catch (final IOException e) {
e.printStackTrace();
}
}
这证明您的系统属性不是从命令行应用的,即 -Dxxx,因此您传递系统属性的方式不正确。因为编程。
由于 job.setnumreducetasks 正在运行,我强烈怀疑您的系统属性未正确传递给驱动程序的下方。
Configuration conf = getConf();
Job job = new Job(conf, conf.get("mapred.job.name"));
将此更改为 this
中的示例
我在Mapper中做HBase扫描,然后Reducer将结果写入HDFS。
mapper输出的记录数大概是1,000,000,000条。
问题是减速器的数量总是一个,虽然我设置了-Dmapred.reduce.tasks=100
。 reduce过程很慢。
// 祝方泽编辑于2016-12-04
我的主要代码 class:
public class GetUrlNotSent2SpiderFromHbase extends Configured implements Tool {
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, conf.get("mapred.job.name"));
String input_table = conf.get("input.table");
job.setJarByClass(GetUrlNotSent2SpiderFromHbase.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sitemap_type"));
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("is_send_to_spider"));
TableMapReduceUtil.initTableMapperJob(
input_table,
scan,
GetUrlNotSent2SpiderFromHbaseMapper.class,
Text.class,
Text.class,
job);
/*job.setMapperClass(GetUrlNotSent2SpiderFromHbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);*/
job.setReducerClass(GetUrlNotSent2SpiderFromHbaseReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
if (job.waitForCompletion(true) && job.isSuccessful()) {
return 0;
}
return -1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int res = ToolRunner.run(conf, new GetUrlNotSent2SpiderFromHbase(), args);
System.exit(res);
}
}
这是 运行 此 MapReduce 作业的脚本:
table="xxx"
output="yyy"
sitemap_type="zzz"
JOBCONF=""
JOBCONF="${JOBCONF} -Dmapred.job.name=test_for_scan_hbase"
JOBCONF="${JOBCONF} -Dinput.table=$table"
JOBCONF="${JOBCONF} -Dmapred.output.dir=$output"
JOBCONF="${JOBCONF} -Ddemand.sitemap.type=$sitemap_type"
JOBCONF="${JOBCONF} -Dyarn.app.mapreduce.am.command-opts='-Xmx8192m'"
JOBCONF="${JOBCONF} -Dyarn.app.mapreduce.am.resource.mb=9216"
JOBCONF="${JOBCONF} -Dmapreduce.map.java.opts='-Xmx1536m'"
JOBCONF="${JOBCONF} -Dmapreduce.map.memory.mb=2048"
JOBCONF="${JOBCONF} -Dmapreduce.reduce.java.opts='-Xmx1536m'"
JOBCONF="${JOBCONF} -Dmapreduce.reduce.memory.mb=2048"
JOBCONF="${JOBCONF} -Dmapred.reduce.tasks=100"
JOBCONF="${JOBCONF} -Dmapred.job.priority=VERY_HIGH"
hadoop fs -rmr $output
hadoop jar get_url_not_sent_2_spider_from_hbase_hourly.jar hourly.GetUrlNotSent2SpiderFromHbase $JOBCONF
echo "===== scan HBase finished ====="
我在代码中设置了job.setNumReduceTasks(100);
,它起作用了。
既然你提到只有一个 reduce 在工作,这就是 reducer 非常慢的明显原因。
了解应用于作业的配置属性的统一方式(您为执行的每个作业调用此方法以了解参数是否正确传递):
将以下方法添加到上面提到的作业驱动程序中,以打印从所有可能来源应用的配置条目,即来自 -D 或其他地方请在提交作业之前在驱动程序中添加此方法调用:
public static void printConfigApplied(Configuration conf)
try {
conf.writeXml(System.out);
} catch (final IOException e) {
e.printStackTrace();
}
}
这证明您的系统属性不是从命令行应用的,即 -Dxxx,因此您传递系统属性的方式不正确。因为编程。
由于 job.setnumreducetasks 正在运行,我强烈怀疑您的系统属性未正确传递给驱动程序的下方。
Configuration conf = getConf();
Job job = new Job(conf, conf.get("mapred.job.name"));
将此更改为 this
中的示例