MapReduce 扫描 HBase 时,Reducer 的个数始终为一个

during HBase scan with MapReduce, the number of Reducer is always one

我在Mapper中做HBase扫描,然后Reducer将结果写入HDFS。
mapper输出的记录数大概是1,000,000,000条。

问题是减速器的数量总是一个,虽然我设置了-Dmapred.reduce.tasks=100。 reduce过程很慢。

// 祝方泽编辑于2016-12-04
我的主要代码 class:

public class GetUrlNotSent2SpiderFromHbase extends Configured implements Tool {

public int run(String[] arg0) throws Exception {

    Configuration conf = getConf();
    Job job = new Job(conf, conf.get("mapred.job.name"));
    String input_table = conf.get("input.table");       

    job.setJarByClass(GetUrlNotSent2SpiderFromHbase.class);

    Scan scan = new Scan();
    scan.setCaching(500);
    scan.setCacheBlocks(false);
    scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sitemap_type"));
    scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("is_send_to_spider"));

    TableMapReduceUtil.initTableMapperJob(
            input_table, 
            scan, 
            GetUrlNotSent2SpiderFromHbaseMapper.class, 
            Text.class, 
            Text.class, 
            job);

    /*job.setMapperClass(GetUrlNotSent2SpiderFromHbaseMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);*/

    job.setReducerClass(GetUrlNotSent2SpiderFromHbaseReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    if (job.waitForCompletion(true) && job.isSuccessful()) {
        return 0;
    }
    return -1;
}

public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    int res = ToolRunner.run(conf, new GetUrlNotSent2SpiderFromHbase(), args);
    System.exit(res);
}

}

这是 运行 此 MapReduce 作业的脚本:

table="xxx"
output="yyy"
sitemap_type="zzz"

JOBCONF=""
JOBCONF="${JOBCONF} -Dmapred.job.name=test_for_scan_hbase"
JOBCONF="${JOBCONF} -Dinput.table=$table"
JOBCONF="${JOBCONF} -Dmapred.output.dir=$output"
JOBCONF="${JOBCONF} -Ddemand.sitemap.type=$sitemap_type"
JOBCONF="${JOBCONF} -Dyarn.app.mapreduce.am.command-opts='-Xmx8192m'"
JOBCONF="${JOBCONF} -Dyarn.app.mapreduce.am.resource.mb=9216"
JOBCONF="${JOBCONF} -Dmapreduce.map.java.opts='-Xmx1536m'"
JOBCONF="${JOBCONF} -Dmapreduce.map.memory.mb=2048"
JOBCONF="${JOBCONF} -Dmapreduce.reduce.java.opts='-Xmx1536m'"
JOBCONF="${JOBCONF} -Dmapreduce.reduce.memory.mb=2048"
JOBCONF="${JOBCONF} -Dmapred.reduce.tasks=100"
JOBCONF="${JOBCONF} -Dmapred.job.priority=VERY_HIGH"

hadoop fs -rmr $output
hadoop jar get_url_not_sent_2_spider_from_hbase_hourly.jar hourly.GetUrlNotSent2SpiderFromHbase $JOBCONF
echo "===== scan HBase finished ====="

我在代码中设置了job.setNumReduceTasks(100);,它起作用了。

既然你提到只有一个 reduce 在工作,这就是 reducer 非常慢的明显原因。

了解应用于作业的配置属性的统一方式(您为执行的每个作业调用此方法以了解参数是否正确传递):

将以下方法添加到上面提到的作业驱动程序中,以打印从所有可能来源应用的配置条目,即来自 -D 或其他地方请在提交作业之前在驱动程序中添加此方法调用:

public static void printConfigApplied(Configuration conf) 
     try {
                conf.writeXml(System.out);
            } catch (final IOException e) {
                e.printStackTrace();
            }
}

这证明您的系统属性不是从命令行应用的,即 -Dxxx,因此您传递系统属性的方式不正确。因为编程。

由于 job.setnumreducetasks 正在运行,我强烈怀疑您的系统属性未正确传递给驱动程序的下方。

 Configuration conf = getConf();
    Job job = new Job(conf, conf.get("mapred.job.name"));

将此更改为 this

中的示例