如何为这种场景编写 MapReduce 代码?

How to write MapReduce code for such scenario?

假设我有一个输入文件如下

dept_id emp_id  salary
1       13611   1234
2       13609   3245
3       13612   3251
2       13623   1232
1       13619   6574
3       13421   234

现在我想求出每个部门的平均工资。像下面的 Hive 查询:

SELECT dept_id, avg(salary) FROM dept GROUP BY dept_id

这将 return 输出:

dept_id avg_sal
----------------
  1     3904.0
  2     2238.5
  3     1742.5

现在,我要做的是生成相同的输出,但使用 mapreduce 框架。那么怎么写呢?提前致谢!

IMPORTANT: Before attempting to implement this, first try some basic examples in MapReduce, like implementing a word count program, to understand the logic and even before that, read a book or a tutorial about how MapReduce works.

聚合东西(比如求平均值)的想法是在映射阶段按键(部门 ID)分组,然后在减少阶段减少特定部门的所有工资。

更形式化的方式:

地图:

input:代表工资记录的一行(即dep_id、emp_id、salary)
输出(键,值):(dep_id,薪水)

减少:

input (key, values): (dep_id, salaries:list of salary values having this dep_id)
输出(键,值):(dep_id,平均(工资))

这样,属于同一个部门的所有工资将由同一个reducer处理。您在减速器中所要做的就是找到输入值的平均值。

如果您还没有参加过任何培训计划,请访问 edureka 在 you tube 上提供的免费视频以更好地理解概念:Map Reduce

映射器

Mapper 将输入 key/value 对映射到一组中间 key/value 对。

映射是将输入记录转换为中间记录的单个任务。转换后的中间记录不需要与输入记录属于同一类型。给定的输入对可能映射到零个或多个输出对。

减速机

Reducer 将共享一个键的一组中间值缩减为较小的一组值。

作业的reduce数量由用户通过Job.setNumReduceTasks(int)设置。

Apache Hadoop 网站中的工作示例关于:Word Count example

对于您的用例,仅使用字数统计示例是不够的。 由于您使用的是 Group by,因此您必须在 Mapper 上使用 Combiner 和 partitioners。访问此视频:Advanced Map reduce

代码----

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AverageSalary {
  public static class AvgMapper
       extends Mapper<Object, Text, Text, FloatWritable>{
    private Text dept_id = new Text();
    private FloatWritable salary = new FloatWritable(); 
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
        String values[] = value.toString().split("\t");
        dept_id.set(values[0]);
        salary.set(Float.parseFloat(values[2]));
        context.write(dept_id, salary);
    }
  }

  public static class AvgReducer
       extends Reducer<Text,FloatWritable,Text,FloatWritable> {
    private FloatWritable result = new FloatWritable();

    public void reduce(Text key, Iterable<FloatWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      float sum = 0;
      float count = 0;
      for (FloatWritable val : values) {
        sum += val.get();
        count++;
      }
      result.set(sum/count);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "average salary");
    job.setJarByClass(AverageSalary.class);
    job.setMapperClass(AvgMapper.class);
    job.setCombinerClass(AvgReducer.class);
    job.setReducerClass(AvgReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FloatWritable.class);
    FileInputFormat.addInputPath(job, new Path("/home/kishore/Data/mapreduce.txt"));  // input path
    FileOutputFormat.setOutputPath(job, new Path("/home/kishore/Data/map3")); // output path
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

output 
1   3904.0
2   2238.5
3   1742.5