如何覆盖Hadoop的默认排序

How to override the default sorting of Hadoop

我有一个 map-reduce 作业,其中的键是 1-200 之间的数字。我的预期输出是 (number,value) 的数字顺序。 但我得到的输出为:

1    value
10   value
11   value
   :
   : 
2    value
20   value
   :
   :
3    value

我知道这是因为 Map-Reduce 的默认行为是按升序对键进行排序。

我希望我的密钥仅按数字顺序排序。我怎样才能做到这一点?

如果我不得不猜测,我会说您将数字存储为 Text 对象而不是 IntWritable 对象。

无论哪种方式,一旦你有多个 reducer,只会对 reducer 中的项目进行排序,但不会完全排序。

如果键是 IntWritable,则 MapReduce 框架中的默认 WritableComparator 通常会处理您的数字排序。我怀疑它得到了一个 Text 键,从而导致你的情况按字典顺序排列。请查看使用 IntWritable 键发出值的示例代码:

1) 映射器实现

package com.Whosebug.answers.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SourceFileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

    private static final String DEFAULT_DELIMITER = "\t";

    private IntWritable keyToEmit = new IntWritable();
    private Text valueToEmit = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        keyToEmit.set(Integer.parseInt(line.split(DEFAULT_DELIMITER)[0]));
        valueToEmit.set(line.split(DEFAULT_DELIMITER)[1]);
        context.write(keyToEmit, valueToEmit);
    }

}

2) 减速器实现

package com.Whosebug.answers.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SourceFileReducer extends Reducer<IntWritable, Text, IntWritable, Text> {

    public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException,
            InterruptedException {
        for (Text value : values) {
            context.write(key, value);
        }
    }

}

3) 驱动实现

package com.Whosebug.answers.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SourceFileDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        Path inputPath = new Path(args[0]);
        Path outputDir = new Path(args[1]);

        // Create configuration
        Configuration conf = new Configuration(true);

        // Create job
        Job job = new Job(conf, "SourceFileDriver");
        job.setJarByClass(SourceFileDriver.class);

        // Setup MapReduce
        job.setMapperClass(SourceFileMapper.class);
        job.setReducerClass(SourceFileReducer.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // Input
        FileInputFormat.addInputPath(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setOutputFormatClass(TextOutputFormat.class);

        // Delete output if exists
        FileSystem hdfs = FileSystem.get(conf);
        if (hdfs.exists(outputDir))
            hdfs.delete(outputDir, true);

        // Execute job
        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);

    }

}

谢谢!