试图从文本文件中显示城市的最高温度,但我拥有所有城市和温度

Trying to show the max temperature of a city from a text file but Im having all cities and temperatures

我正在尝试开发一个 mapreduce 程序来显示文本文件中最高温度的城市。

文本文件"temperatures.txt"有这个内容(城市和温度):

城市1 10

城市 2 11

城市 3 4

城市4 20

...

city10000 22

在这个例子中,我想要的结果是打印最后一行,它有更高的温度:

city10000 22

我有这样的减速器文件:

import sys

current_city = None
current_max = 0
city = None

for line in sys.stdin:
    line = line.strip()

    city, temperature = line.rsplit('\t', 1)

    try:
        temperature = float(temperature)
    except ValueError:
        continue

    if current_city == city:
        if temperature > current_max:
            current_max = temperature
    else:
        if current_city:
            print '%s\t%s' % (current_city, current_max)
        current_max = temperature
        current_city = city

if current_city == city:
    print '%s\t%s' % (current_city, current_max)

但是,当我测试这个 reducer.py 文件时,我总是得到相同的结果,我总是得到所有城市和温度,就像这样:

城市1 10

城市 2 11

城市 3 4

城市4 20

...

city10000 22

你看到我的 reducer 文件有什么问题吗?

我只想显示最高温度的城市,在这种情况下最高温度的城市是 city10000,所以我只想要这个结果:

city10000 22

我对python了解不多。但您可以按照以下场景操作:

-> 创建一个地图,将键存储为城市,将值存储为温度。

-> 现在,将前五个城市及其温度存储在地图中。

-> 5个城市后,将每个城市的温度与地图上所有5个城市的温度进行比较。如果地图中任何城市的温度低于此值,则用新城市替换该城市及其温度。

-> 最后,您可以打印地图。这将得到温度最高的 5 个城市。

import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;

 import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 public class Weather {

  public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         String line = value.toString();
           String year = line.substring(15, 19);

           int airTemperature;
           if (line.charAt(87) == '+') {
                        airTemperature= Integer.parseInt(line.substring(88, 92));
                    }
           else
               airTemperature= - Integer.parseInt(line.substring(88, 92));

                if(airTemperature!=9999 && airTemperature!=-9999){
                    airTemperature/=10;
                context.write(new Text(year),new IntWritable(airTemperature));
                }
       }
  } 

  public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

     public void reduce(Text key, Iterable<IntWritable> values, Context context) 
       throws IOException, InterruptedException {

         int maxValue=Integer.MIN_VALUE;

         Iterator<IntWritable> itr = values.iterator();
         while(itr.hasNext()){
             maxValue = Math.max(maxValue,itr.next().get());
         }
         context.write(key, new IntWritable(maxValue));
     }
  }

  public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();

         Job job = new Job(conf, "temparature");
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);

     job.setMapperClass(Map.class);
     job.setReducerClass(Reduce.class);
     job.setCombinerClass(Reduce.class);  

     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(IntWritable.class);

     job.setInputFormatClass(TextInputFormat.class);
     job.setOutputFormatClass(TextOutputFormat.class);

     FileInputFormat.addInputPath(job, new Path("hdfs://localhost:8020/input/"));
     FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/output/"));

     // FileInputFormat.addInputPath(job, new Path(args[0]));
    //  FileOutputFormat.setOutputPath(job, new Path(args[1]));

     job.waitForCompletion(true);
  }

 }

天气数据集:

0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991991999999=9999[9999]

首先让我解释一下我认为代码哪里出了问题,然后我将提供一个工作示例。问题出在减速器中的 if else 语句上。

这是 if 部分:

if current_city == city:
    if temperature > current_max:
        current_max = temperature

只有在同一个城市被列出两次时才会发生这种情况,更重要的是这是代码检查新城市的 temperature 是否为大于 current_max.

我怀疑大部分时间会花在语句的else部分:

else:
    if current_city:
        print '%s\t%s' % (current_city, current_max)
    current_max = temperature
    current_city = city

这里有两个问题:

  1. 当定义了current_city时,程序总是打印一行。这就是从 reducer 生成城市列表的原因。

  2. 该程序还协助 current_max 变量而不检查 temperature 是否更大。

这是一个应该可以工作的减速器:

import sys

current_city = None
current_max = 0
city = None

for line in sys.stdin:
    line = line.strip()

    city, temperature = line.rsplit('\t', 1)

    try:
        temperature = float(temperature)
    except ValueError:
        continue

    if temperature > current_max:
        current_max = temperature
        current_city = city

print '%s\t%s' % (current_city, current_max)

我要提的最后一件事是设置 current_max = 0 不是一个好主意。摄氏温度很容易低于零。如果您的城市列表和温度是在冬季,则有可能 none 个城市的温度超过 0,代码将 return:

None    0.0