Reducer Class 在 Hadoop MapReduce 中未按预期工作
Reducer Class not working as expected in Hadoop MapReduce
我尝试在 Mapreduce 中实现简单的分组依据。
我的输入文件如下:
7369,SMITH,CLERK,800,20
7499,ALLEN,SALESMAN,1600,30
7521,WARD,SALESMAN,1250,30
7566,JONES,MANAGER,2975,20
7654,MARTIN,SALESMAN,1250,30
7698,BLAKE,MANAGER,2850,30
7782,CLARK,MANAGER,2450,10
7788,SCOTT,ANALYST,3000,20
7839,KING,PRESIDENT,5000,10
7844,TURNER,SALESMAN,1500,30
7876,ADAMS,CLERK,1100,20
7900,JAMES,CLERK,950,30
7902,FORD,ANALYST,3000,20
7934,MILLER,CLERK,1300,10
我的映射器Class:
public class Groupmapper extends Mapper<Object,Text,IntWritable,IntWritable> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] parts=line.split(",");
String token1=parts[3];
String token2=parts[4];
int deptno=Integer.parseInt(token2);
int sal=Integer.parseInt(token1);
context.write(new IntWritable(deptno),new IntWritable(sal));
}
}
减速机Class:
public class Groupreducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
IntWritable result=new IntWritable();
public void Reduce(IntWritable key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
result.set(sum);
context.write(key,result);
}
}
Driver Class:
public class Group {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,"Group");
job.setJarByClass(Group.class);
job.setMapperClass(Groupmapper.class);
job.setCombinerClass(Groupreducer.class);
job.setReducerClass(Groupreducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
预期输出应为:
10 8750
20 10875
30 9400
但它会打印下面给出的输出。它没有聚合值。
它像 identity reducer 一样工作。
10 1300
10 5000
10 2450
20 1100
20 3000
20 800
20 2975
20 3000
30 1500
30 1600
30 2850
30 1250
30 1250
30 950
Reducer 功能无法正常工作。
看起来确实没有使用 reduce。因此,仔细查看您的减速器将是调试的下一步。
如果您将 @Override
添加到您的 reduce 方法(就像您在 map 方法中所做的那样),您将看到您收到 Method does not override method from its superclass
错误。这意味着 hadoop 不会使用您的 reduce 方法,而是使用默认的身份实现。
问题是您有:
public void Reduce(IntWritable key,Iterable<IntWritable> values, Context context)
应该是:
public void reduce(IntWritable key,Iterable<IntWritable> values, Context context)
唯一的区别是方法的名称应该以小写字母开头 r
。
我尝试在 Mapreduce 中实现简单的分组依据。
我的输入文件如下:
7369,SMITH,CLERK,800,20
7499,ALLEN,SALESMAN,1600,30
7521,WARD,SALESMAN,1250,30
7566,JONES,MANAGER,2975,20
7654,MARTIN,SALESMAN,1250,30
7698,BLAKE,MANAGER,2850,30
7782,CLARK,MANAGER,2450,10
7788,SCOTT,ANALYST,3000,20
7839,KING,PRESIDENT,5000,10
7844,TURNER,SALESMAN,1500,30
7876,ADAMS,CLERK,1100,20
7900,JAMES,CLERK,950,30
7902,FORD,ANALYST,3000,20
7934,MILLER,CLERK,1300,10
我的映射器Class:
public class Groupmapper extends Mapper<Object,Text,IntWritable,IntWritable> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] parts=line.split(",");
String token1=parts[3];
String token2=parts[4];
int deptno=Integer.parseInt(token2);
int sal=Integer.parseInt(token1);
context.write(new IntWritable(deptno),new IntWritable(sal));
}
}
减速机Class:
public class Groupreducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
IntWritable result=new IntWritable();
public void Reduce(IntWritable key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
result.set(sum);
context.write(key,result);
}
}
Driver Class:
public class Group {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,"Group");
job.setJarByClass(Group.class);
job.setMapperClass(Groupmapper.class);
job.setCombinerClass(Groupreducer.class);
job.setReducerClass(Groupreducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
预期输出应为:
10 8750
20 10875
30 9400
但它会打印下面给出的输出。它没有聚合值。 它像 identity reducer 一样工作。
10 1300
10 5000
10 2450
20 1100
20 3000
20 800
20 2975
20 3000
30 1500
30 1600
30 2850
30 1250
30 1250
30 950
Reducer 功能无法正常工作。
看起来确实没有使用 reduce。因此,仔细查看您的减速器将是调试的下一步。
如果您将 @Override
添加到您的 reduce 方法(就像您在 map 方法中所做的那样),您将看到您收到 Method does not override method from its superclass
错误。这意味着 hadoop 不会使用您的 reduce 方法,而是使用默认的身份实现。
问题是您有:
public void Reduce(IntWritable key,Iterable<IntWritable> values, Context context)
应该是:
public void reduce(IntWritable key,Iterable<IntWritable> values, Context context)
唯一的区别是方法的名称应该以小写字母开头 r
。