如何在使用 Hadoop java api 完成 Mapreduce 作业后调用自定义方法?
How to call a custom method after Mapreduce Job completition using Hadoop java api?
我正在尝试 运行 一个 mapreduce 程序,只是为了更好地理解 WordCount。一切都像它想象的那样工作得很好。我想在 MapReduce 程序完成后调用一个函数,在该函数中,我想将在 reduce 步骤中生成的所有部分文件合并到一个包含所有部分文件内容的文本文件中。我看到了相关问题,人们建议使用 FileUtil.copyMerge 函数。我的问题是如何进行函数调用,以便在整个 mapreduce 过程后执行。
public class mapreducetask {
private void filesmerger(){
// I want to merge partfiles here in the function(maybe using FileUtils.copyMerge)
}
public static void main(String [] args) throws Exception{
Configuration cnf = new Configuration();
cnf.set("mapreduce.output.textoutputformat.seperator",":");
Integer numberOfReducers = 3;
Job jb = new Job(cnf,"mapreducejob");
jb.setJarByClass(mapreducetask.class);
jb.setMapperClass(mapper.class);
jb.setNumReduceTasks(numberOfReducers);
jb.setReducerClass(reducer.class);
jb.setOutputKeyClass(Text.class);
jb.setOutputValueClass(IntWritable.class);
jb.setInputFormatClass(customfileinputformat.class);
Path input = new Path("Input");
Path output = new Path ("Output");
FileInputFormat.addInputPath(jb, input);
FileOutputFormat.setOutputPath(jb, output);
// Should I call my merger function here. Location 1
System.exit(jb.waitForCompletion(true)?0:1);
}
}
当我从位置 1 进行调用时(请参阅代码),它似乎甚至在我不想要的 mapreduce 程序之前就已执行。如何在 Mapreduce 过程完成后调用函数。
您在调用 jb.waitForCompletion(true)
之前调用了位置 1 中的代码。您需要在之后调用它(显然不能调用 System.exit()
)。所以:
jb.waitForCompletion(true);
//Run your code
我正在尝试 运行 一个 mapreduce 程序,只是为了更好地理解 WordCount。一切都像它想象的那样工作得很好。我想在 MapReduce 程序完成后调用一个函数,在该函数中,我想将在 reduce 步骤中生成的所有部分文件合并到一个包含所有部分文件内容的文本文件中。我看到了相关问题,人们建议使用 FileUtil.copyMerge 函数。我的问题是如何进行函数调用,以便在整个 mapreduce 过程后执行。
public class mapreducetask {
private void filesmerger(){
// I want to merge partfiles here in the function(maybe using FileUtils.copyMerge)
}
public static void main(String [] args) throws Exception{
Configuration cnf = new Configuration();
cnf.set("mapreduce.output.textoutputformat.seperator",":");
Integer numberOfReducers = 3;
Job jb = new Job(cnf,"mapreducejob");
jb.setJarByClass(mapreducetask.class);
jb.setMapperClass(mapper.class);
jb.setNumReduceTasks(numberOfReducers);
jb.setReducerClass(reducer.class);
jb.setOutputKeyClass(Text.class);
jb.setOutputValueClass(IntWritable.class);
jb.setInputFormatClass(customfileinputformat.class);
Path input = new Path("Input");
Path output = new Path ("Output");
FileInputFormat.addInputPath(jb, input);
FileOutputFormat.setOutputPath(jb, output);
// Should I call my merger function here. Location 1
System.exit(jb.waitForCompletion(true)?0:1);
}
}
当我从位置 1 进行调用时(请参阅代码),它似乎甚至在我不想要的 mapreduce 程序之前就已执行。如何在 Mapreduce 过程完成后调用函数。
您在调用 jb.waitForCompletion(true)
之前调用了位置 1 中的代码。您需要在之后调用它(显然不能调用 System.exit()
)。所以:
jb.waitForCompletion(true);
//Run your code