将价值转移到减速器的清理功能不起作用

Transfering Value To Cleanup Function of the Reducer Not Working

我不明白我的 MapReduce 作业给我的输出是什么。我有一个 .csv 文件作为输入,其中存储了一个城市的地区以及每个地区每棵树的年龄。

在组合器中,我尝试获取每个地区最古老的树,而在我的减速器中,我尝试检索城市中拥有最古老树的地区。

我的问题是,虽然 reduce 函数给我的输出值是 1112165,reducer 中的 cleanup 函数应该 return 那些(5 ) 实际上 returns 9 (这是我的减速器分析的最后一个值)。

我不明白我错过了什么。

以下是我到目前为止的尝试。

映射器:

package com.opstty.mapper;

import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text result = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString(),";");
        int i = 0;
        while (itr.hasMoreTokens()) {
            String arrondissement = itr.nextToken();
            if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {

                itr.nextToken();itr.nextToken();itr.nextToken();
                String annee = itr.nextToken();
                result.set(arrondissement);

                if(Double.parseDouble((String.valueOf(annee))) > 1000){
                    context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
                    i+=3;
                }
            }
            i++;
        }
    }
}

组合器:

package com.opstty.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List a = new ArrayList();
        int sum = 0;
        for (IntWritable val : values) {
            a.add(val.get());
        }
        Collections.sort(a);
        result.set((Integer) Collections.min(a));
        context.write(key, result);
    }
}

减速器:

public class IntReducer6 extends Reducer<Text, IntWritable, Text, NullWritable> {
    private int max = 100000;
    private int annee=0;
    int i =0;
    private  List a = new ArrayList();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {


        for (IntWritable value : values)
        {
            annee = value.get();
        }

        if(annee < max)
        {
            a.add(key);
            max = annee;
            context.write((Text) a.get(i), NullWritable.get());
            i++;
        }
    }

    @Override
    // only display the character which has the largest value
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write((Text) a.get(a.size()-1), NullWritable.get());
    }
}

感谢您的帮助! @课程 所以这是我的映射器:

package com.opstty.mapper;

import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text result = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString(),";");
        int i = 0;
        while (itr.hasMoreTokens()) {
            String arrondissement = itr.nextToken();
            if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {

                itr.nextToken();itr.nextToken();itr.nextToken();
                String annee = itr.nextToken();
                result.set(arrondissement);

                if(Double.parseDouble((String.valueOf(annee))) > 1000){
                    context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
                    i+=3;
                }
            }
            i++;
        }
    }
}

和我的组合器:

package com.opstty.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List a = new ArrayList();
        int sum = 0;
        for (IntWritable val : values) {
            a.add(val.get());
        }
        Collections.sort(a);
        result.set((Integer) Collections.min(a));
        context.write(key, result);
    }
}

目标是我有一个 csv 文件。在地图中,我得到了城市内每棵树的地区和年龄。 在我的组合器中,我得到了每个地区最古老的树 在我的 reduce 中,我想打印实际上拥有城市最古老树的地区

就 MapReduce 作业而言,您在 reduce(和 combiner-reduce)方法中的方法对于这种简单类型来说有点矫枉过正。老实说,这类任务似乎根本不需要组合器,certainly cannot use a cleanup function for the reducers since it is executed for each one of them.

你的程序的主要问题是没有考虑 reduce 函数的操作,因为后者是通过它的多个实例为每个 key 值执行的,或者更简单地说,每个 key 分别调用 reduce 函数 。这意味着对于您的工作类型,您的 reduce 函数只需执行一次(对于所有“键”,我们将在下面看到结果如何),以便找到最旧的地区树.

考虑到这一点,map 函数应该按照这样的方式排列输入 .csv 文件每一行的数据,其中每个 key-valuekey ] 对对于每一对都是相同的(为了让 reduce 函数对 all 行进行操作)并且每对的值都包含名称地区和每棵树的年龄。因此,映射器将生成 key-value 对,如果 NULL 值将成为所有映射器的键,并且每个值将是一个复合值,其中存储了地区名称和特定树龄,例如所以:

<NULL, (district, tree_age)>

至于 reduce 函数,它只需要扫描基于 NULL 键(也就是所有对)的每个值并找到最大树龄。然后,最终输出 key-value 对将显示最老树和最大树龄的地区,如下所示:

<district_with_oldest_tree, max_tree_age>

为了展示我测试过的答案,我采取了一些自由来简化你的程序,主要是因为法语(?)命名的变量让我有点困惑,你通过严格使用 Hadoop-friendly 数据结构使事情变得过于复杂,比如StringTokenizer 当更新的 Hadoop 版本支持更常见的 Java 数据类型时。

首先,由于我没有查看您的输入 .csv 文件,我创建了我的 trees.csv 存储在名为 trees 的目录中,其中包含以下行,包含地区和树龄的列:

District A; 7
District B; 20
District C; 10
District C; 1
District B; 17
District A; 6
District A; 11
District B; 18
District C; 2

在我的(all-put-in-one-file-for-the-sake-of-simplicity)程序中,@字符被用作分隔符来分隔映射器生成的复合键上的数据,结果存储在名为oldest_tree。您可以根据您的需要或您自己的 .csv 输入文件更改它。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class OldestTree
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <NULL, (district, tree_age)>
     */
    public static class Map extends Mapper<Object, Text, NullWritable, Text> 
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {
            String row = value.toString();

            String[] columns = row.split("; ");     // split each row by the delimiter
            String district_name = columns[0];
            String tree_age = columns[1];

            // set NULL as key for the generated key-value pairs aimed at the reducers
            // and set the district with each of its trees age as a composite value,
            // with the '@' character as a delimiter
            context.write(NullWritable.get(), new Text(district_name + '@' + tree_age));
        }
    }

    /* input: <NULL, (district, tree_age)>
     * output: <district_with_oldest_tree, max_tree_age>
     */
    public static class Reduce extends Reducer<NullWritable, Text, Text, IntWritable>
    {
        public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        {
            String district_with_oldest_tree = "";
            int max_tree_age = 0;

            // for all the values with the same (NULL) key,
            // aka all the key-value pairs...
            for(Text value : values)
            {
                // split the composite value by the '@' delimiter
                String[] splitted_values = value.toString().split("@");
                String district_name = splitted_values[0];
                int tree_age = Integer.parseInt(splitted_values[1]);

                // find the district with the oldest tree
                if(tree_age > max_tree_age)
                {
                    district_with_oldest_tree = district_name;
                    max_tree_age = tree_age;
                }
            }

            // output the district (key) with the oldest tree's year of planting (value)
            // to the output directory
            context.write(new Text(district_with_oldest_tree), new IntWritable(max_tree_age));
        }
    }

    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("trees");
        Path output_dir = new Path("oldest_tree");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job oldesttree_job = Job.getInstance(conf, "Oldest Tree");
        oldesttree_job.setJarByClass(OldestTree.class);
        oldesttree_job.setMapperClass(Map.class);
        oldesttree_job.setReducerClass(Reduce.class);    
        oldesttree_job.setMapOutputKeyClass(NullWritable.class);
        oldesttree_job.setMapOutputValueClass(Text.class);
        oldesttree_job.setOutputKeyClass(Text.class);
        oldesttree_job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(oldesttree_job, input_dir);
        FileOutputFormat.setOutputPath(oldesttree_job, output_dir);
        oldesttree_job.waitForCompletion(true);
    }
}

所以oldest_tree目录下的程序结果(通过Hadoop HDFS浏览器看到)是:

它运行完美,谢谢!

我使用组合器的原因是我的老师提出问题的方式:

Write a MapReduce job that displays the district where the oldest tree is. The mapper must extract the age and district of each tree. The problem is, this information can’t be used as keys and values (why?). You will need to define a subclass of Writable to contain both information. The reducer should consolidate all this data and only output district.

我是 map/reduce 的新手,我并没有真正理解什么是 Writable 的子类。所以我在网上搜索了一下,发现了一些关于combiner和WritableCompare的话题。