将价值转移到减速器的清理功能不起作用
Transfering Value To Cleanup Function of the Reducer Not Working
我不明白我的 MapReduce 作业给我的输出是什么。我有一个 .csv
文件作为输入,其中存储了一个城市的地区以及每个地区每棵树的年龄。
在组合器中,我尝试获取每个地区最古老的树,而在我的减速器中,我尝试检索城市中拥有最古老树的地区。
我的问题是,虽然 reduce
函数给我的输出值是 11、12、16 和 5,reducer 中的 cleanup
函数应该 return 那些(5 ) 实际上 returns 9 (这是我的减速器分析的最后一个值)。
我不明白我错过了什么。
以下是我到目前为止的尝试。
映射器:
package com.opstty.mapper;
import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text result = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(),";");
int i = 0;
while (itr.hasMoreTokens()) {
String arrondissement = itr.nextToken();
if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {
itr.nextToken();itr.nextToken();itr.nextToken();
String annee = itr.nextToken();
result.set(arrondissement);
if(Double.parseDouble((String.valueOf(annee))) > 1000){
context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
i+=3;
}
}
i++;
}
}
}
组合器:
package com.opstty.job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List a = new ArrayList();
int sum = 0;
for (IntWritable val : values) {
a.add(val.get());
}
Collections.sort(a);
result.set((Integer) Collections.min(a));
context.write(key, result);
}
}
减速器:
public class IntReducer6 extends Reducer<Text, IntWritable, Text, NullWritable> {
private int max = 100000;
private int annee=0;
int i =0;
private List a = new ArrayList();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for (IntWritable value : values)
{
annee = value.get();
}
if(annee < max)
{
a.add(key);
max = annee;
context.write((Text) a.get(i), NullWritable.get());
i++;
}
}
@Override
// only display the character which has the largest value
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write((Text) a.get(a.size()-1), NullWritable.get());
}
}
感谢您的帮助! @课程
所以这是我的映射器:
package com.opstty.mapper;
import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text result = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(),";");
int i = 0;
while (itr.hasMoreTokens()) {
String arrondissement = itr.nextToken();
if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {
itr.nextToken();itr.nextToken();itr.nextToken();
String annee = itr.nextToken();
result.set(arrondissement);
if(Double.parseDouble((String.valueOf(annee))) > 1000){
context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
i+=3;
}
}
i++;
}
}
}
和我的组合器:
package com.opstty.job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List a = new ArrayList();
int sum = 0;
for (IntWritable val : values) {
a.add(val.get());
}
Collections.sort(a);
result.set((Integer) Collections.min(a));
context.write(key, result);
}
}
目标是我有一个 csv 文件。在地图中,我得到了城市内每棵树的地区和年龄。
在我的组合器中,我得到了每个地区最古老的树
在我的 reduce 中,我想打印实际上拥有城市最古老树的地区
就 MapReduce 作业而言,您在 reduce
(和 combiner-reduce
)方法中的方法对于这种简单类型来说有点矫枉过正。老实说,这类任务似乎根本不需要组合器,certainly cannot use a cleanup function for the reducers since it is executed for each one of them.
你的程序的主要问题是没有考虑 reduce
函数的操作,因为后者是通过它的多个实例为每个 key
值执行的,或者更简单地说,每个 key
分别调用 reduce
函数 。这意味着对于您的工作类型,您的 reduce
函数只需执行一次(对于所有“键”,我们将在下面看到结果如何),以便找到最旧的地区树.
考虑到这一点,map
函数应该按照这样的方式排列输入 .csv
文件每一行的数据,其中每个 key-value
的 key
] 对对于每一对都是相同的(为了让 reduce
函数对 all 行进行操作)并且每对的值都包含名称地区和每棵树的年龄。因此,映射器将生成 key-value
对,如果 NULL
值将成为所有映射器的键,并且每个值将是一个复合值,其中存储了地区名称和特定树龄,例如所以:
<NULL, (district, tree_age)>
至于 reduce
函数,它只需要扫描基于 NULL
键(也就是所有对)的每个值并找到最大树龄。然后,最终输出 key-value
对将显示最老树和最大树龄的地区,如下所示:
<district_with_oldest_tree, max_tree_age>
为了展示我测试过的答案,我采取了一些自由来简化你的程序,主要是因为法语(?)命名的变量让我有点困惑,你通过严格使用 Hadoop-friendly 数据结构使事情变得过于复杂,比如StringTokenizer
当更新的 Hadoop 版本支持更常见的 Java 数据类型时。
首先,由于我没有查看您的输入 .csv
文件,我创建了我的 trees.csv
存储在名为 trees
的目录中,其中包含以下行,包含地区和树龄的列:
District A; 7
District B; 20
District C; 10
District C; 1
District B; 17
District A; 6
District A; 11
District B; 18
District C; 2
在我的(all-put-in-one-file-for-the-sake-of-simplicity)程序中,@
字符被用作分隔符来分隔映射器生成的复合键上的数据,结果存储在名为oldest_tree
。您可以根据您的需要或您自己的 .csv
输入文件更改它。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class OldestTree
{
/* input: <byte_offset, line_of_dataset>
* output: <NULL, (district, tree_age)>
*/
public static class Map extends Mapper<Object, Text, NullWritable, Text>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String row = value.toString();
String[] columns = row.split("; "); // split each row by the delimiter
String district_name = columns[0];
String tree_age = columns[1];
// set NULL as key for the generated key-value pairs aimed at the reducers
// and set the district with each of its trees age as a composite value,
// with the '@' character as a delimiter
context.write(NullWritable.get(), new Text(district_name + '@' + tree_age));
}
}
/* input: <NULL, (district, tree_age)>
* output: <district_with_oldest_tree, max_tree_age>
*/
public static class Reduce extends Reducer<NullWritable, Text, Text, IntWritable>
{
public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
String district_with_oldest_tree = "";
int max_tree_age = 0;
// for all the values with the same (NULL) key,
// aka all the key-value pairs...
for(Text value : values)
{
// split the composite value by the '@' delimiter
String[] splitted_values = value.toString().split("@");
String district_name = splitted_values[0];
int tree_age = Integer.parseInt(splitted_values[1]);
// find the district with the oldest tree
if(tree_age > max_tree_age)
{
district_with_oldest_tree = district_name;
max_tree_age = tree_age;
}
}
// output the district (key) with the oldest tree's year of planting (value)
// to the output directory
context.write(new Text(district_with_oldest_tree), new IntWritable(max_tree_age));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("trees");
Path output_dir = new Path("oldest_tree");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job oldesttree_job = Job.getInstance(conf, "Oldest Tree");
oldesttree_job.setJarByClass(OldestTree.class);
oldesttree_job.setMapperClass(Map.class);
oldesttree_job.setReducerClass(Reduce.class);
oldesttree_job.setMapOutputKeyClass(NullWritable.class);
oldesttree_job.setMapOutputValueClass(Text.class);
oldesttree_job.setOutputKeyClass(Text.class);
oldesttree_job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(oldesttree_job, input_dir);
FileOutputFormat.setOutputPath(oldesttree_job, output_dir);
oldesttree_job.waitForCompletion(true);
}
}
所以oldest_tree
目录下的程序结果(通过Hadoop HDFS浏览器看到)是:
它运行完美,谢谢!
我使用组合器的原因是我的老师提出问题的方式:
Write a MapReduce job that displays the district where the oldest tree is. The mapper must extract the age and district of each tree.
The problem is, this information can’t be used as keys and values (why?). You will need to define a subclass of Writable to contain both information.
The reducer should consolidate all this data and only output district.
我是 map/reduce 的新手,我并没有真正理解什么是 Writable 的子类。所以我在网上搜索了一下,发现了一些关于combiner和WritableCompare的话题。
我不明白我的 MapReduce 作业给我的输出是什么。我有一个 .csv
文件作为输入,其中存储了一个城市的地区以及每个地区每棵树的年龄。
在组合器中,我尝试获取每个地区最古老的树,而在我的减速器中,我尝试检索城市中拥有最古老树的地区。
我的问题是,虽然 reduce
函数给我的输出值是 11、12、16 和 5,reducer 中的 cleanup
函数应该 return 那些(5 ) 实际上 returns 9 (这是我的减速器分析的最后一个值)。
我不明白我错过了什么。
以下是我到目前为止的尝试。
映射器:
package com.opstty.mapper;
import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text result = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(),";");
int i = 0;
while (itr.hasMoreTokens()) {
String arrondissement = itr.nextToken();
if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {
itr.nextToken();itr.nextToken();itr.nextToken();
String annee = itr.nextToken();
result.set(arrondissement);
if(Double.parseDouble((String.valueOf(annee))) > 1000){
context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
i+=3;
}
}
i++;
}
}
}
组合器:
package com.opstty.job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List a = new ArrayList();
int sum = 0;
for (IntWritable val : values) {
a.add(val.get());
}
Collections.sort(a);
result.set((Integer) Collections.min(a));
context.write(key, result);
}
}
减速器:
public class IntReducer6 extends Reducer<Text, IntWritable, Text, NullWritable> {
private int max = 100000;
private int annee=0;
int i =0;
private List a = new ArrayList();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for (IntWritable value : values)
{
annee = value.get();
}
if(annee < max)
{
a.add(key);
max = annee;
context.write((Text) a.get(i), NullWritable.get());
i++;
}
}
@Override
// only display the character which has the largest value
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write((Text) a.get(a.size()-1), NullWritable.get());
}
}
感谢您的帮助! @课程 所以这是我的映射器:
package com.opstty.mapper;
import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text result = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(),";");
int i = 0;
while (itr.hasMoreTokens()) {
String arrondissement = itr.nextToken();
if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {
itr.nextToken();itr.nextToken();itr.nextToken();
String annee = itr.nextToken();
result.set(arrondissement);
if(Double.parseDouble((String.valueOf(annee))) > 1000){
context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
i+=3;
}
}
i++;
}
}
}
和我的组合器:
package com.opstty.job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List a = new ArrayList();
int sum = 0;
for (IntWritable val : values) {
a.add(val.get());
}
Collections.sort(a);
result.set((Integer) Collections.min(a));
context.write(key, result);
}
}
目标是我有一个 csv 文件。在地图中,我得到了城市内每棵树的地区和年龄。 在我的组合器中,我得到了每个地区最古老的树 在我的 reduce 中,我想打印实际上拥有城市最古老树的地区
就 MapReduce 作业而言,您在 reduce
(和 combiner-reduce
)方法中的方法对于这种简单类型来说有点矫枉过正。老实说,这类任务似乎根本不需要组合器,certainly cannot use a cleanup function for the reducers since it is executed for each one of them.
你的程序的主要问题是没有考虑 reduce
函数的操作,因为后者是通过它的多个实例为每个 key
值执行的,或者更简单地说,每个 key
分别调用 reduce
函数 。这意味着对于您的工作类型,您的 reduce
函数只需执行一次(对于所有“键”,我们将在下面看到结果如何),以便找到最旧的地区树.
考虑到这一点,map
函数应该按照这样的方式排列输入 .csv
文件每一行的数据,其中每个 key-value
的 key
] 对对于每一对都是相同的(为了让 reduce
函数对 all 行进行操作)并且每对的值都包含名称地区和每棵树的年龄。因此,映射器将生成 key-value
对,如果 NULL
值将成为所有映射器的键,并且每个值将是一个复合值,其中存储了地区名称和特定树龄,例如所以:
<NULL, (district, tree_age)>
至于 reduce
函数,它只需要扫描基于 NULL
键(也就是所有对)的每个值并找到最大树龄。然后,最终输出 key-value
对将显示最老树和最大树龄的地区,如下所示:
<district_with_oldest_tree, max_tree_age>
为了展示我测试过的答案,我采取了一些自由来简化你的程序,主要是因为法语(?)命名的变量让我有点困惑,你通过严格使用 Hadoop-friendly 数据结构使事情变得过于复杂,比如StringTokenizer
当更新的 Hadoop 版本支持更常见的 Java 数据类型时。
首先,由于我没有查看您的输入 .csv
文件,我创建了我的 trees.csv
存储在名为 trees
的目录中,其中包含以下行,包含地区和树龄的列:
District A; 7
District B; 20
District C; 10
District C; 1
District B; 17
District A; 6
District A; 11
District B; 18
District C; 2
在我的(all-put-in-one-file-for-the-sake-of-simplicity)程序中,@
字符被用作分隔符来分隔映射器生成的复合键上的数据,结果存储在名为oldest_tree
。您可以根据您的需要或您自己的 .csv
输入文件更改它。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class OldestTree
{
/* input: <byte_offset, line_of_dataset>
* output: <NULL, (district, tree_age)>
*/
public static class Map extends Mapper<Object, Text, NullWritable, Text>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String row = value.toString();
String[] columns = row.split("; "); // split each row by the delimiter
String district_name = columns[0];
String tree_age = columns[1];
// set NULL as key for the generated key-value pairs aimed at the reducers
// and set the district with each of its trees age as a composite value,
// with the '@' character as a delimiter
context.write(NullWritable.get(), new Text(district_name + '@' + tree_age));
}
}
/* input: <NULL, (district, tree_age)>
* output: <district_with_oldest_tree, max_tree_age>
*/
public static class Reduce extends Reducer<NullWritable, Text, Text, IntWritable>
{
public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
String district_with_oldest_tree = "";
int max_tree_age = 0;
// for all the values with the same (NULL) key,
// aka all the key-value pairs...
for(Text value : values)
{
// split the composite value by the '@' delimiter
String[] splitted_values = value.toString().split("@");
String district_name = splitted_values[0];
int tree_age = Integer.parseInt(splitted_values[1]);
// find the district with the oldest tree
if(tree_age > max_tree_age)
{
district_with_oldest_tree = district_name;
max_tree_age = tree_age;
}
}
// output the district (key) with the oldest tree's year of planting (value)
// to the output directory
context.write(new Text(district_with_oldest_tree), new IntWritable(max_tree_age));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("trees");
Path output_dir = new Path("oldest_tree");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job oldesttree_job = Job.getInstance(conf, "Oldest Tree");
oldesttree_job.setJarByClass(OldestTree.class);
oldesttree_job.setMapperClass(Map.class);
oldesttree_job.setReducerClass(Reduce.class);
oldesttree_job.setMapOutputKeyClass(NullWritable.class);
oldesttree_job.setMapOutputValueClass(Text.class);
oldesttree_job.setOutputKeyClass(Text.class);
oldesttree_job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(oldesttree_job, input_dir);
FileOutputFormat.setOutputPath(oldesttree_job, output_dir);
oldesttree_job.waitForCompletion(true);
}
}
所以oldest_tree
目录下的程序结果(通过Hadoop HDFS浏览器看到)是:
它运行完美,谢谢!
我使用组合器的原因是我的老师提出问题的方式:
Write a MapReduce job that displays the district where the oldest tree is. The mapper must extract the age and district of each tree. The problem is, this information can’t be used as keys and values (why?). You will need to define a subclass of Writable to contain both information. The reducer should consolidate all this data and only output district.
我是 map/reduce 的新手,我并没有真正理解什么是 Writable 的子类。所以我在网上搜索了一下,发现了一些关于combiner和WritableCompare的话题。