如何删除组合器输出并仅保留 mapreduce 最终输出中的 reducer 输出

How to remove combiner output and keep only reducer output in mapreduce final output

您好,我是 运行 一个从 HBase 读取记录并写入文本文件的应用程序。

我在我的应用程序和自定义分区器中也使用了组合器。我在我的应用程序中使用了 41 个减速器,因为我需要创建 40 个减速器输出文件来满足我在自定义分区程序中的条件。

一切正常,但是当我在我的应用程序中使用组合器时,它会为每个区域或每个映射器创建地图输出文件。

敌人示例我的应用程序中有 40 个区域,因此启动了 40 个映射器,然后它创建了 40 个映射输出文件。但是 reducer 无法组合所有 map 输出并生成最终的 reducer 输出文件,这将是 40 个 reducer 输出文件。

文件中的数据是正确的,但没有文件增加。

知道我怎样才能只获得 reducer 输出文件。

import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class CommonCombiner extends Reducer<NullWritable, Text, NullWritable, Text> {

    private Logger logger = Logger.getLogger(CommonCombiner.class);
    private MultipleOutputs<NullWritable, Text> multipleOutputs;
    String strName = "";
    private static final String DATA_SEPERATOR = "\|\!\|";

    public void setup(Context context) {
        logger.info("Inside Combiner.");
        multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
    }

    @Override
    public void reduce(NullWritable Key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        for (Text value : values) {
            final String valueStr = value.toString();
            StringBuilder sb = new StringBuilder();
            if ("".equals(strName) && strName.length() == 0) {
                String[] strArrFileName = valueStr.split(DATA_SEPERATOR);
                String strFullFileName[] = strArrFileName[1].split("\|\^\|");

                strName = strFullFileName[strFullFileName.length - 1];


                String strArrvalueStr[] = valueStr.split(DATA_SEPERATOR);
                if (!strArrvalueStr[0].contains(HbaseBulkLoadMapperConstants.FF_ACTION)) {
                    sb.append(strArrvalueStr[0] + "|!|");
                }
                multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
                context.getCounter(Counters.FILE_DATA_COUNTER).increment(1);


            }

        }
    }


    public void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
}

让我们弄清楚基础知识

  1. Combiner 是一种优化,可以 运行 在 mapper 和 reduce 中(reduce 的合并阶段)(fetch merge reduce phases)。

  2. 找出数据中键的分布,给定的映射器是否访问了很多相同的键,如果是,则组合器有帮助,否则它没有效果。

  3. 1 K regions no where 保证均分。您有一些热点区域

  4. 找到热点区域并拆分。

请关注:http://bytepadding.com/big-data/map-reduce/understanding-map-reduce-the-missing-guide/

您没有从组合器输出任何数据供减速器使用。在您的组合器中,您正在使用:

multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);

这不是您将数据写出以供阶段之间使用的方式,即从映射器或组合器到减少阶段。你应该使用:

context.write()

MultipleOutputs 只是一种将额外文件写入需要多个文件的磁盘的方法。我从未见过它用于组合器。