如何为这种场景编写 MapReduce 代码?
How to write MapReduce code for such scenario?
假设我有一个输入文件如下
dept_id emp_id salary
1 13611 1234
2 13609 3245
3 13612 3251
2 13623 1232
1 13619 6574
3 13421 234
现在我想求出每个部门的平均工资。像下面的 Hive 查询:
SELECT dept_id, avg(salary) FROM dept GROUP BY dept_id
这将 return 输出:
dept_id avg_sal
----------------
1 3904.0
2 2238.5
3 1742.5
现在,我要做的是生成相同的输出,但使用 mapreduce 框架。那么怎么写呢?提前致谢!
IMPORTANT:
Before attempting to implement this, first try some basic examples in MapReduce, like implementing a word count program, to understand the logic and even before that, read a book or a tutorial about how MapReduce works.
聚合东西(比如求平均值)的想法是在映射阶段按键(部门 ID)分组,然后在减少阶段减少特定部门的所有工资。
更形式化的方式:
地图:
input:代表工资记录的一行(即dep_id、emp_id、salary)
输出(键,值):(dep_id,薪水)
减少:
input (key, values): (dep_id, salaries:list of salary values having this dep_id)
输出(键,值):(dep_id,平均(工资))
这样,属于同一个部门的所有工资将由同一个reducer处理。您在减速器中所要做的就是找到输入值的平均值。
如果您还没有参加过任何培训计划,请访问 edureka 在 you tube 上提供的免费视频以更好地理解概念:Map Reduce
映射器
Mapper 将输入 key/value 对映射到一组中间 key/value 对。
映射是将输入记录转换为中间记录的单个任务。转换后的中间记录不需要与输入记录属于同一类型。给定的输入对可能映射到零个或多个输出对。
减速机
Reducer 将共享一个键的一组中间值缩减为较小的一组值。
作业的reduce数量由用户通过Job.setNumReduceTasks(int)设置。
Apache Hadoop 网站中的工作示例关于:Word Count example
对于您的用例,仅使用字数统计示例是不够的。
由于您使用的是 Group by,因此您必须在 Mapper 上使用 Combiner 和 partitioners。访问此视频:Advanced Map reduce
代码----
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AverageSalary {
public static class AvgMapper
extends Mapper<Object, Text, Text, FloatWritable>{
private Text dept_id = new Text();
private FloatWritable salary = new FloatWritable();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String values[] = value.toString().split("\t");
dept_id.set(values[0]);
salary.set(Float.parseFloat(values[2]));
context.write(dept_id, salary);
}
}
public static class AvgReducer
extends Reducer<Text,FloatWritable,Text,FloatWritable> {
private FloatWritable result = new FloatWritable();
public void reduce(Text key, Iterable<FloatWritable> values,
Context context
) throws IOException, InterruptedException {
float sum = 0;
float count = 0;
for (FloatWritable val : values) {
sum += val.get();
count++;
}
result.set(sum/count);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "average salary");
job.setJarByClass(AverageSalary.class);
job.setMapperClass(AvgMapper.class);
job.setCombinerClass(AvgReducer.class);
job.setReducerClass(AvgReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path("/home/kishore/Data/mapreduce.txt")); // input path
FileOutputFormat.setOutputPath(job, new Path("/home/kishore/Data/map3")); // output path
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
output
1 3904.0
2 2238.5
3 1742.5
假设我有一个输入文件如下
dept_id emp_id salary
1 13611 1234
2 13609 3245
3 13612 3251
2 13623 1232
1 13619 6574
3 13421 234
现在我想求出每个部门的平均工资。像下面的 Hive 查询:
SELECT dept_id, avg(salary) FROM dept GROUP BY dept_id
这将 return 输出:
dept_id avg_sal
----------------
1 3904.0
2 2238.5
3 1742.5
现在,我要做的是生成相同的输出,但使用 mapreduce 框架。那么怎么写呢?提前致谢!
IMPORTANT: Before attempting to implement this, first try some basic examples in MapReduce, like implementing a word count program, to understand the logic and even before that, read a book or a tutorial about how MapReduce works.
聚合东西(比如求平均值)的想法是在映射阶段按键(部门 ID)分组,然后在减少阶段减少特定部门的所有工资。
更形式化的方式:
地图:
input:代表工资记录的一行(即dep_id、emp_id、salary)
输出(键,值):(dep_id,薪水)
减少:
input (key, values): (dep_id, salaries:list of salary values having this dep_id)
输出(键,值):(dep_id,平均(工资))
这样,属于同一个部门的所有工资将由同一个reducer处理。您在减速器中所要做的就是找到输入值的平均值。
如果您还没有参加过任何培训计划,请访问 edureka 在 you tube 上提供的免费视频以更好地理解概念:Map Reduce
映射器
Mapper 将输入 key/value 对映射到一组中间 key/value 对。
映射是将输入记录转换为中间记录的单个任务。转换后的中间记录不需要与输入记录属于同一类型。给定的输入对可能映射到零个或多个输出对。
减速机
Reducer 将共享一个键的一组中间值缩减为较小的一组值。
作业的reduce数量由用户通过Job.setNumReduceTasks(int)设置。
Apache Hadoop 网站中的工作示例关于:Word Count example
对于您的用例,仅使用字数统计示例是不够的。 由于您使用的是 Group by,因此您必须在 Mapper 上使用 Combiner 和 partitioners。访问此视频:Advanced Map reduce
代码----
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AverageSalary {
public static class AvgMapper
extends Mapper<Object, Text, Text, FloatWritable>{
private Text dept_id = new Text();
private FloatWritable salary = new FloatWritable();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String values[] = value.toString().split("\t");
dept_id.set(values[0]);
salary.set(Float.parseFloat(values[2]));
context.write(dept_id, salary);
}
}
public static class AvgReducer
extends Reducer<Text,FloatWritable,Text,FloatWritable> {
private FloatWritable result = new FloatWritable();
public void reduce(Text key, Iterable<FloatWritable> values,
Context context
) throws IOException, InterruptedException {
float sum = 0;
float count = 0;
for (FloatWritable val : values) {
sum += val.get();
count++;
}
result.set(sum/count);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "average salary");
job.setJarByClass(AverageSalary.class);
job.setMapperClass(AvgMapper.class);
job.setCombinerClass(AvgReducer.class);
job.setReducerClass(AvgReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path("/home/kishore/Data/mapreduce.txt")); // input path
FileOutputFormat.setOutputPath(job, new Path("/home/kishore/Data/map3")); // output path
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
output
1 3904.0
2 2238.5
3 1742.5