java.io.IOException 在 MapReduce 中
java.io.IOException in MapReduce
我想使用 MapReduce 获取 txt 文件中每年的最大值和最小值。文件中的内容如下所示:
1979 23 23 2 43 24 25 26 26 26 26 25 26
1980 26 27 28 28 28 30 31 31 31 30 30 30
1981 31 32 32 32 33 34 35 36 36 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38
1985 38 39 39 39 39 41 41 41 00 40 39 39
第一列代表年份。
我希望 MapReduce 给我这样的最终输出:
1979 2, 26
1980 26, 31
...
所以我在Java中这样写代码:
public class MaxValue_MinValue {
public static class E_Mappter extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] elements = line.split("\s");
Text outputKey = new Text(elements[0]);
for(int i = 1; i<elements.length;i++) {
context.write(outputKey, new IntWritable(Integer.parseInt(elements[i])));
}
}
}
public static class E_Reducer extends Reducer<Text,IntWritable, Text, Text> {
public void reduce(Text inKey,Iterable<IntWritable> inValues, Context context) throws IOException, InterruptedException {
int maxTemp = 0;
int minTemp = 0;
for(IntWritable ele : inValues) {
if (ele.get() > maxTemp) {
maxTemp = ele.get();
}
if (ele.get() < minTemp) {
minTemp = ele.get();
}
}
context.write(inKey, new Text("Max is " + maxTemp + ", Min is " + minTemp));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Max value, min value for each year");
job.setJarByClass(MaxValue_MinValue.class);
job.setMapperClass(E_Mappter.class);
job.setReducerClass(E_Reducer.class);
job.setCombinerClass(E_Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
但是当我 运行 它时,我收到以下错误消息:
hadoop@steven81-HP:/usr/local/hadoop277$ ./bin/hadoop jar ./myApp/MinValue_MaxValue.jar /user/hadoop/input/Electrical__Consumption.txt /user/hadoop/output7
19/04/10 16:59:21 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
19/04/10 16:59:21 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
19/04/10 16:59:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/04/10 16:59:22 INFO input.FileInputFormat: Total input paths to process : 1
19/04/10 16:59:22 INFO mapreduce.JobSubmitter: number of splits:1
19/04/10 16:59:22 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1076320101_0001
19/04/10 16:59:23 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
19/04/10 16:59:23 INFO mapreduce.Job: Running job: job_local1076320101_0001
19/04/10 16:59:23 INFO mapred.LocalJobRunner: OutputCommitter set in config null
19/04/10 16:59:23 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/10 16:59:23 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
19/04/10 16:59:23 INFO mapred.LocalJobRunner: Waiting for map tasks
19/04/10 16:59:23 INFO mapred.LocalJobRunner: Starting task: attempt_local1076320101_0001_m_000000_0
19/04/10 16:59:23 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/10 16:59:23 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
19/04/10 16:59:23 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/user/hadoop/input/Electrical__Consumption.txt:0+204
19/04/10 16:59:23 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
19/04/10 16:59:23 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
19/04/10 16:59:23 INFO mapred.MapTask: soft limit at 83886080
19/04/10 16:59:23 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
19/04/10 16:59:23 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
19/04/10 16:59:23 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
19/04/10 16:59:24 INFO mapred.MapTask: Starting flush of map output
19/04/10 16:59:24 INFO mapred.LocalJobRunner: map task executor complete.
19/04/10 16:59:24 INFO mapreduce.Job: Job job_local1076320101_0001 running in uber mode : false
19/04/10 16:59:24 INFO mapreduce.Job: map 0% reduce 0%
19/04/10 16:59:24 WARN mapred.LocalJobRunner: job_local1076320101_0001
java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1077)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at test.map.reduce.MaxValue_MinValue$E_Mappter.map(MaxValue_MinValue.java:23)
at test.map.reduce.MaxValue_MinValue$E_Mappter.map(MaxValue_MinValue.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/04/10 16:59:25 INFO mapreduce.Job: Job job_local1076320101_0001 failed with state FAILED due to: NA
19/04/10 16:59:25 INFO mapreduce.Job: Counters: 0
我被这个错误弄糊涂了"Type mismatch in value from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.IntWritable"因为map的输出是(Text, IntWritable)而reduce的输入也是(Text, IntWritable), 所以我不知道为什么, 谁能帮帮我?
Combiner 必须能够接受来自 Mapper 的数据,并且必须输出可用作 Reducer 输入的数据。在您的情况下,Combiner 输出类型为 <Text, Text>
,但 Reducer 输入类型为 <Text, IntWritable>
,因此它们不匹配。
您实际上不需要 MapReduce 来解决这个问题,因为您在每一行上都有每年可用的所有数据,并且您不需要在行之间进行比较。
String line = value.toString();
String[] elements = line.split("\s");
Text year = new Text(elements[0]);
int maxTemp = INTEGER.MIN_VALUE;
int minTemp = INTEGER.MAX_VALUE;
int temp;
for(int i = 1; i<elements.length;i++) {
temp = Integer.parseInt(elements[i])
if (temp < minTemp) {
minTemp = temp;
} else if (temp > maxTemp) {
maxTemp = temp;
}
}
System.out.println("For year " + year + ", the minimum temperature was " + minTemp + " and the maximum temperature was " + maxTemp);
我想使用 MapReduce 获取 txt 文件中每年的最大值和最小值。文件中的内容如下所示:
1979 23 23 2 43 24 25 26 26 26 26 25 26
1980 26 27 28 28 28 30 31 31 31 30 30 30
1981 31 32 32 32 33 34 35 36 36 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38
1985 38 39 39 39 39 41 41 41 00 40 39 39
第一列代表年份。 我希望 MapReduce 给我这样的最终输出:
1979 2, 26
1980 26, 31
...
所以我在Java中这样写代码:
public class MaxValue_MinValue {
public static class E_Mappter extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] elements = line.split("\s");
Text outputKey = new Text(elements[0]);
for(int i = 1; i<elements.length;i++) {
context.write(outputKey, new IntWritable(Integer.parseInt(elements[i])));
}
}
}
public static class E_Reducer extends Reducer<Text,IntWritable, Text, Text> {
public void reduce(Text inKey,Iterable<IntWritable> inValues, Context context) throws IOException, InterruptedException {
int maxTemp = 0;
int minTemp = 0;
for(IntWritable ele : inValues) {
if (ele.get() > maxTemp) {
maxTemp = ele.get();
}
if (ele.get() < minTemp) {
minTemp = ele.get();
}
}
context.write(inKey, new Text("Max is " + maxTemp + ", Min is " + minTemp));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Max value, min value for each year");
job.setJarByClass(MaxValue_MinValue.class);
job.setMapperClass(E_Mappter.class);
job.setReducerClass(E_Reducer.class);
job.setCombinerClass(E_Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
但是当我 运行 它时,我收到以下错误消息:
hadoop@steven81-HP:/usr/local/hadoop277$ ./bin/hadoop jar ./myApp/MinValue_MaxValue.jar /user/hadoop/input/Electrical__Consumption.txt /user/hadoop/output7
19/04/10 16:59:21 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
19/04/10 16:59:21 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
19/04/10 16:59:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/04/10 16:59:22 INFO input.FileInputFormat: Total input paths to process : 1
19/04/10 16:59:22 INFO mapreduce.JobSubmitter: number of splits:1
19/04/10 16:59:22 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1076320101_0001
19/04/10 16:59:23 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
19/04/10 16:59:23 INFO mapreduce.Job: Running job: job_local1076320101_0001
19/04/10 16:59:23 INFO mapred.LocalJobRunner: OutputCommitter set in config null
19/04/10 16:59:23 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/10 16:59:23 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
19/04/10 16:59:23 INFO mapred.LocalJobRunner: Waiting for map tasks
19/04/10 16:59:23 INFO mapred.LocalJobRunner: Starting task: attempt_local1076320101_0001_m_000000_0
19/04/10 16:59:23 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/10 16:59:23 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
19/04/10 16:59:23 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/user/hadoop/input/Electrical__Consumption.txt:0+204
19/04/10 16:59:23 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
19/04/10 16:59:23 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
19/04/10 16:59:23 INFO mapred.MapTask: soft limit at 83886080
19/04/10 16:59:23 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
19/04/10 16:59:23 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
19/04/10 16:59:23 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
19/04/10 16:59:24 INFO mapred.MapTask: Starting flush of map output
19/04/10 16:59:24 INFO mapred.LocalJobRunner: map task executor complete.
19/04/10 16:59:24 INFO mapreduce.Job: Job job_local1076320101_0001 running in uber mode : false
19/04/10 16:59:24 INFO mapreduce.Job: map 0% reduce 0%
19/04/10 16:59:24 WARN mapred.LocalJobRunner: job_local1076320101_0001
java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1077)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at test.map.reduce.MaxValue_MinValue$E_Mappter.map(MaxValue_MinValue.java:23)
at test.map.reduce.MaxValue_MinValue$E_Mappter.map(MaxValue_MinValue.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/04/10 16:59:25 INFO mapreduce.Job: Job job_local1076320101_0001 failed with state FAILED due to: NA
19/04/10 16:59:25 INFO mapreduce.Job: Counters: 0
我被这个错误弄糊涂了"Type mismatch in value from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.IntWritable"因为map的输出是(Text, IntWritable)而reduce的输入也是(Text, IntWritable), 所以我不知道为什么, 谁能帮帮我?
Combiner 必须能够接受来自 Mapper 的数据,并且必须输出可用作 Reducer 输入的数据。在您的情况下,Combiner 输出类型为 <Text, Text>
,但 Reducer 输入类型为 <Text, IntWritable>
,因此它们不匹配。
您实际上不需要 MapReduce 来解决这个问题,因为您在每一行上都有每年可用的所有数据,并且您不需要在行之间进行比较。
String line = value.toString();
String[] elements = line.split("\s");
Text year = new Text(elements[0]);
int maxTemp = INTEGER.MIN_VALUE;
int minTemp = INTEGER.MAX_VALUE;
int temp;
for(int i = 1; i<elements.length;i++) {
temp = Integer.parseInt(elements[i])
if (temp < minTemp) {
minTemp = temp;
} else if (temp > maxTemp) {
maxTemp = temp;
}
}
System.out.println("For year " + year + ", the minimum temperature was " + minTemp + " and the maximum temperature was " + maxTemp);