如何覆盖Hadoop的默认排序
How to override the default sorting of Hadoop
我有一个 map-reduce 作业,其中的键是 1-200 之间的数字。我的预期输出是 (number,value) 的数字顺序。
但我得到的输出为:
1 value
10 value
11 value
:
:
2 value
20 value
:
:
3 value
我知道这是因为 Map-Reduce 的默认行为是按升序对键进行排序。
我希望我的密钥仅按数字顺序排序。我怎样才能做到这一点?
如果我不得不猜测,我会说您将数字存储为 Text 对象而不是 IntWritable 对象。
无论哪种方式,一旦你有多个 reducer,只会对 reducer 中的项目进行排序,但不会完全排序。
如果键是 IntWritable
,则 MapReduce 框架中的默认 WritableComparator
通常会处理您的数字排序。我怀疑它得到了一个 Text
键,从而导致你的情况按字典顺序排列。请查看使用 IntWritable
键发出值的示例代码:
1) 映射器实现
package com.Whosebug.answers.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SourceFileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private static final String DEFAULT_DELIMITER = "\t";
private IntWritable keyToEmit = new IntWritable();
private Text valueToEmit = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
keyToEmit.set(Integer.parseInt(line.split(DEFAULT_DELIMITER)[0]));
valueToEmit.set(line.split(DEFAULT_DELIMITER)[1]);
context.write(keyToEmit, valueToEmit);
}
}
2) 减速器实现
package com.Whosebug.answers.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SourceFileReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
3) 驱动实现
package com.Whosebug.answers.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SourceFileDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Path inputPath = new Path(args[0]);
Path outputDir = new Path(args[1]);
// Create configuration
Configuration conf = new Configuration(true);
// Create job
Job job = new Job(conf, "SourceFileDriver");
job.setJarByClass(SourceFileDriver.class);
// Setup MapReduce
job.setMapperClass(SourceFileMapper.class);
job.setReducerClass(SourceFileReducer.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
// Delete output if exists
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute job
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
}
谢谢!
我有一个 map-reduce 作业,其中的键是 1-200 之间的数字。我的预期输出是 (number,value) 的数字顺序。 但我得到的输出为:
1 value
10 value
11 value
:
:
2 value
20 value
:
:
3 value
我知道这是因为 Map-Reduce 的默认行为是按升序对键进行排序。
我希望我的密钥仅按数字顺序排序。我怎样才能做到这一点?
如果我不得不猜测,我会说您将数字存储为 Text 对象而不是 IntWritable 对象。
无论哪种方式,一旦你有多个 reducer,只会对 reducer 中的项目进行排序,但不会完全排序。
如果键是 IntWritable
,则 MapReduce 框架中的默认 WritableComparator
通常会处理您的数字排序。我怀疑它得到了一个 Text
键,从而导致你的情况按字典顺序排列。请查看使用 IntWritable
键发出值的示例代码:
1) 映射器实现
package com.Whosebug.answers.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SourceFileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private static final String DEFAULT_DELIMITER = "\t";
private IntWritable keyToEmit = new IntWritable();
private Text valueToEmit = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
keyToEmit.set(Integer.parseInt(line.split(DEFAULT_DELIMITER)[0]));
valueToEmit.set(line.split(DEFAULT_DELIMITER)[1]);
context.write(keyToEmit, valueToEmit);
}
}
2) 减速器实现
package com.Whosebug.answers.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SourceFileReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
3) 驱动实现
package com.Whosebug.answers.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SourceFileDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Path inputPath = new Path(args[0]);
Path outputDir = new Path(args[1]);
// Create configuration
Configuration conf = new Configuration(true);
// Create job
Job job = new Job(conf, "SourceFileDriver");
job.setJarByClass(SourceFileDriver.class);
// Setup MapReduce
job.setMapperClass(SourceFileMapper.class);
job.setReducerClass(SourceFileReducer.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
// Delete output if exists
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute job
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
}
谢谢!