组合器在 HBase 扫描 mapreduce 中为每个区域创建 mapoutput 文件
Combiner creating mapoutput file per region in HBase scan mapreduce
您好,我是 运行 一个从 HBase 读取记录并写入文本文件的应用程序。
我在我的应用程序和自定义分区器中也使用了组合器。
我在我的应用程序中使用了 41 个减速器,因为我需要创建 40 个减速器输出文件来满足我在自定义分区程序中的条件。
一切正常,但当我在我的应用程序中使用组合器时,它会为每个区域或每个映射器创建地图输出文件。
敌人示例我的应用程序中有 40 个区域,因此启动 40 个映射器然后创建 40 个映射输出文件。
但是 reducer 无法组合所有 map 输出并生成最终的 reducer 输出文件,这将是 40 个 reducer 输出文件。
文件中的数据是正确的,但没有文件增加。
知道我怎样才能只获得 reducer 输出文件。
// Reducer Class
job.setCombinerClass(CommonReducer.class);
job.setReducerClass(CommonReducer.class); // reducer class
以下是我的工作详情
Submitted: Mon Apr 10 09:42:55 CDT 2017
Started: Mon Apr 10 09:43:03 CDT 2017
Finished: Mon Apr 10 10:11:20 CDT 2017
Elapsed: 28mins, 17sec
Diagnostics:
Average Map Time 6mins, 13sec
Average Shuffle Time 17mins, 56sec
Average Merge Time 0sec
Average Reduce Time 0sec
这是我的减速器逻辑
import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class CommonCombiner extends Reducer<NullWritable, Text, NullWritable, Text> {
private Logger logger = Logger.getLogger(CommonCombiner.class);
private MultipleOutputs<NullWritable, Text> multipleOutputs;
String strName = "";
private static final String DATA_SEPERATOR = "\|\!\|";
public void setup(Context context) {
logger.info("Inside Combiner.");
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
public void reduce(NullWritable Key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
final String valueStr = value.toString();
StringBuilder sb = new StringBuilder();
if ("".equals(strName) && strName.length() == 0) {
String[] strArrFileName = valueStr.split(DATA_SEPERATOR);
String strFullFileName[] = strArrFileName[1].split("\|\^\|");
strName = strFullFileName[strFullFileName.length - 1];
String strArrvalueStr[] = valueStr.split(DATA_SEPERATOR);
if (!strArrvalueStr[0].contains(HbaseBulkLoadMapperConstants.FF_ACTION)) {
sb.append(strArrvalueStr[0] + "|!|");
}
multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
context.getCounter(Counters.FILE_DATA_COUNTER).increment(1);
}
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}
我已经替换了multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
context.write()
我得到了正确的输出。
您好,我是 运行 一个从 HBase 读取记录并写入文本文件的应用程序。
我在我的应用程序和自定义分区器中也使用了组合器。 我在我的应用程序中使用了 41 个减速器,因为我需要创建 40 个减速器输出文件来满足我在自定义分区程序中的条件。
一切正常,但当我在我的应用程序中使用组合器时,它会为每个区域或每个映射器创建地图输出文件。
敌人示例我的应用程序中有 40 个区域,因此启动 40 个映射器然后创建 40 个映射输出文件。 但是 reducer 无法组合所有 map 输出并生成最终的 reducer 输出文件,这将是 40 个 reducer 输出文件。
文件中的数据是正确的,但没有文件增加。
知道我怎样才能只获得 reducer 输出文件。
// Reducer Class
job.setCombinerClass(CommonReducer.class);
job.setReducerClass(CommonReducer.class); // reducer class
以下是我的工作详情
Submitted: Mon Apr 10 09:42:55 CDT 2017
Started: Mon Apr 10 09:43:03 CDT 2017
Finished: Mon Apr 10 10:11:20 CDT 2017
Elapsed: 28mins, 17sec
Diagnostics:
Average Map Time 6mins, 13sec
Average Shuffle Time 17mins, 56sec
Average Merge Time 0sec
Average Reduce Time 0sec
这是我的减速器逻辑
import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class CommonCombiner extends Reducer<NullWritable, Text, NullWritable, Text> {
private Logger logger = Logger.getLogger(CommonCombiner.class);
private MultipleOutputs<NullWritable, Text> multipleOutputs;
String strName = "";
private static final String DATA_SEPERATOR = "\|\!\|";
public void setup(Context context) {
logger.info("Inside Combiner.");
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
public void reduce(NullWritable Key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
final String valueStr = value.toString();
StringBuilder sb = new StringBuilder();
if ("".equals(strName) && strName.length() == 0) {
String[] strArrFileName = valueStr.split(DATA_SEPERATOR);
String strFullFileName[] = strArrFileName[1].split("\|\^\|");
strName = strFullFileName[strFullFileName.length - 1];
String strArrvalueStr[] = valueStr.split(DATA_SEPERATOR);
if (!strArrvalueStr[0].contains(HbaseBulkLoadMapperConstants.FF_ACTION)) {
sb.append(strArrvalueStr[0] + "|!|");
}
multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
context.getCounter(Counters.FILE_DATA_COUNTER).increment(1);
}
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}
我已经替换了multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
context.write()
我得到了正确的输出。