如何在 MapReduce 中比较来自不同文件的信息?
How to compare informations from different files in MapReduce?
目的是了解文件 X 与文件 y1、y2、...、yn 的相似程度。
对于每个文件,我提取信息并将它们存储在结构中;假设我从一个文件中进行字数统计,并将结果存储在 HashMap<String, Integer> wordCount
中(还有其他结构存储其他信息)。
所以我需要生成fileX的wordCount;提取 fileY 的 wordCount(预先生成并写入 HDFS 文件);计算这两个字数有多少相似(我不能逐行区分;我需要百分比相似)。
FileX 已修复,需要与 N 个 fileY 进行比较。
所以我的想法是:
Job1:计算fileX信息并写入HDFS。
Job2(map1-map2的chainMapper):
Map1:读取 HashMap<String, Integer> wordCount
个文件 X;将结构传递给 Map2。
Map2:获取 2 个输入,fileX 的结构,fileYs 的目录路径。
Map2计算HashMap<String, Integer> wordCountX
和HashMap<String, Integer> wordCountY
的相似度; reducer 获取所有相似度值并对它们进行排序。
我在 Hadoop - The definitive guide of Tom White
和网上也读过关于 MultipleInputs
的内容,但这不是关于 1 个映射器的两个输入,而是根据输入来区分映射器。所以我想问一下如何将两个值转发给单个映射器;我考虑过使用分布式缓存,但它对这个问题没有用;最后,如何确保每个映射器获得不同的文件 Y。
我尝试更新全局 HashMap<String, Integer> wordCount
但是当新作业开始时,它无法访问该结构(或者更好的是,它是空的)。
public class Matching extends Configured implements Tool{
private static HashMap<String, Integer> wordCountX;
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Matching(), args);
System.exit(res);
} //end main class
public int run(String[] args) throws Exception {
...
}
}
编辑:
看到的答案是一个很好的解决方案。
我添加了生成的代码片段。
正在启动作业:
//configuration and launch of job
Job search = Job.getInstance(getConf(), "2. Merging and searching");
search.setJarByClass(this.getClass());
MultipleInputs.addInputPath(search, creationPath, TextInputFormat.class);
MultipleInputs.addInputPath(search, toMatchPath, TextInputFormat.class);
FileOutputFormat.setOutputPath(search, resultPath);
search.setNumReduceTasks(Integer.parseInt(args[2]));
search.setMapperClass(Map.class);
search.setReducerClass(Reduce.class);
search.setMapOutputKeyClass(ValuesGenerated.class);
search.setMapOutputValueClass(IntWritable.class);
//TODO
search.setOutputKeyClass(NullWritable.class);
search.setOutputValueClass(Text.class);
return search.waitForCompletion(true) ? 0 : 1;
地图合并(在清理阶段):
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
InputSplit split = context.getInputSplit();
Class<? extends InputSplit> splitClass = split.getClass();
FileSplit fileSplit = null;
if (splitClass.equals(FileSplit.class)) {
fileSplit = (FileSplit) split;
} else if (splitClass.getName().equals(
"org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
// begin reflection hackery...
try {
Method getInputSplitMethod = splitClass
.getDeclaredMethod("getInputSplit");
getInputSplitMethod.setAccessible(true);
fileSplit = (FileSplit) getInputSplitMethod.invoke(split);
} catch (Exception e) {
// wrap and re-throw error
throw new IOException(e);
}
// end reflection hackery
}
String filename = fileSplit.getPath().getName();
boolean isKnown;
/*
the two input files are nominated dinamically;
the file0 has some name "023901.txt",
the file1 is the output of a precedent MR job, and is
something like "chars-r-000000"
*/
if(filename.contains(".txt")) {
isKnown = false;
}
else {
isKnown = true;
}
if(isKnown) { //file1, known
ValuesGenerated.setName(new Text(name));
//other values set
//...
context.write(ValuesGenerated, new IntWritable(1));
}
else { //file0, unknown
ValuesGenerated.setName(new Text("unknown"));
//other values set
//...
context.write(ValuesGenerated, new IntWritable(0));
}
}
减少阶段:
public static class Reduce extends Reducer<ValuesGenerated, IntWritable, NullWritable, Text> {
@Override
public void reduce(ValuesGenerated key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
ValuesGenerated known;
ValuesGenerated unk;
String toEmit = null;
for (IntWritable value : values) {
if(value.get() == 1) { //known
known = key;
toEmit = key.toString();
toEmit += "\n " + value;
context.write(NullWritable.get(), new Text(toEmit));
}
else { //unknown
unk = key;
toEmit = key.toString();
toEmit += "\n " + value;
context.write(NullWritable.get(), new Text(toEmit));
}
}
}//end reduce
} //end Reduce class
我遇到了另一个问题,但是我用这个解决方案绕过了它hadoop MultipleInputs fails with ClassCastException
您可以使用数据库,甚至写入文件,而不是全局。
检查频率与HashMaps大小的比例并比较:
HashMap<String, Integer> similarities = new HashMap<String, Integer>();
int matching = 0
Int totalX = getTotal(wordCountX);
int totalY = getTotal(wordCountY);
wordCountX.forEach((k,v)->{
Integer count = wordCountY.get(k);
if (count.getIntValue() / totalY == v.getIntValue() / totalX)
similarities.put(k, Integer.valueOf(v.getIntValue() / totalY);
});
只需添加多个文件输入路径,您就可以将多个文件输入到同一个Mapper。然后,您可以使用 mapperContext 来识别哪个文件拆分来自哪个文件位置。
基本上,
第 1 步:MR 作业
读取文件1+2
在 mapper 中发出 <word, [val1, val2]>
(如果文件从 file1 拆分出来,则 val1 为 1,否则为 0;与 val2 类似)
- 在 reducer 中写入 hashmap
<work, [file1_count, file2_count]>
第 2 步:合并分片(wordcount 不能那么大并且应该适合一台机器)并使用简单的 java 作业创建自定义相似性指标
目的是了解文件 X 与文件 y1、y2、...、yn 的相似程度。
对于每个文件,我提取信息并将它们存储在结构中;假设我从一个文件中进行字数统计,并将结果存储在 HashMap<String, Integer> wordCount
中(还有其他结构存储其他信息)。
所以我需要生成fileX的wordCount;提取 fileY 的 wordCount(预先生成并写入 HDFS 文件);计算这两个字数有多少相似(我不能逐行区分;我需要百分比相似)。
FileX 已修复,需要与 N 个 fileY 进行比较。
所以我的想法是:
Job1:计算fileX信息并写入HDFS。
Job2(map1-map2的chainMapper):
Map1:读取 HashMap<String, Integer> wordCount
个文件 X;将结构传递给 Map2。
Map2:获取 2 个输入,fileX 的结构,fileYs 的目录路径。
Map2计算HashMap<String, Integer> wordCountX
和HashMap<String, Integer> wordCountY
的相似度; reducer 获取所有相似度值并对它们进行排序。
我在 Hadoop - The definitive guide of Tom White
和网上也读过关于 MultipleInputs
的内容,但这不是关于 1 个映射器的两个输入,而是根据输入来区分映射器。所以我想问一下如何将两个值转发给单个映射器;我考虑过使用分布式缓存,但它对这个问题没有用;最后,如何确保每个映射器获得不同的文件 Y。
我尝试更新全局 HashMap<String, Integer> wordCount
但是当新作业开始时,它无法访问该结构(或者更好的是,它是空的)。
public class Matching extends Configured implements Tool{
private static HashMap<String, Integer> wordCountX;
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Matching(), args);
System.exit(res);
} //end main class
public int run(String[] args) throws Exception {
...
}
}
编辑:
看到的答案是一个很好的解决方案。
我添加了生成的代码片段。
正在启动作业:
//configuration and launch of job
Job search = Job.getInstance(getConf(), "2. Merging and searching");
search.setJarByClass(this.getClass());
MultipleInputs.addInputPath(search, creationPath, TextInputFormat.class);
MultipleInputs.addInputPath(search, toMatchPath, TextInputFormat.class);
FileOutputFormat.setOutputPath(search, resultPath);
search.setNumReduceTasks(Integer.parseInt(args[2]));
search.setMapperClass(Map.class);
search.setReducerClass(Reduce.class);
search.setMapOutputKeyClass(ValuesGenerated.class);
search.setMapOutputValueClass(IntWritable.class);
//TODO
search.setOutputKeyClass(NullWritable.class);
search.setOutputValueClass(Text.class);
return search.waitForCompletion(true) ? 0 : 1;
地图合并(在清理阶段):
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
InputSplit split = context.getInputSplit();
Class<? extends InputSplit> splitClass = split.getClass();
FileSplit fileSplit = null;
if (splitClass.equals(FileSplit.class)) {
fileSplit = (FileSplit) split;
} else if (splitClass.getName().equals(
"org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
// begin reflection hackery...
try {
Method getInputSplitMethod = splitClass
.getDeclaredMethod("getInputSplit");
getInputSplitMethod.setAccessible(true);
fileSplit = (FileSplit) getInputSplitMethod.invoke(split);
} catch (Exception e) {
// wrap and re-throw error
throw new IOException(e);
}
// end reflection hackery
}
String filename = fileSplit.getPath().getName();
boolean isKnown;
/*
the two input files are nominated dinamically;
the file0 has some name "023901.txt",
the file1 is the output of a precedent MR job, and is
something like "chars-r-000000"
*/
if(filename.contains(".txt")) {
isKnown = false;
}
else {
isKnown = true;
}
if(isKnown) { //file1, known
ValuesGenerated.setName(new Text(name));
//other values set
//...
context.write(ValuesGenerated, new IntWritable(1));
}
else { //file0, unknown
ValuesGenerated.setName(new Text("unknown"));
//other values set
//...
context.write(ValuesGenerated, new IntWritable(0));
}
}
减少阶段:
public static class Reduce extends Reducer<ValuesGenerated, IntWritable, NullWritable, Text> {
@Override
public void reduce(ValuesGenerated key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
ValuesGenerated known;
ValuesGenerated unk;
String toEmit = null;
for (IntWritable value : values) {
if(value.get() == 1) { //known
known = key;
toEmit = key.toString();
toEmit += "\n " + value;
context.write(NullWritable.get(), new Text(toEmit));
}
else { //unknown
unk = key;
toEmit = key.toString();
toEmit += "\n " + value;
context.write(NullWritable.get(), new Text(toEmit));
}
}
}//end reduce
} //end Reduce class
我遇到了另一个问题,但是我用这个解决方案绕过了它hadoop MultipleInputs fails with ClassCastException
您可以使用数据库,甚至写入文件,而不是全局。
检查频率与HashMaps大小的比例并比较:
HashMap<String, Integer> similarities = new HashMap<String, Integer>();
int matching = 0
Int totalX = getTotal(wordCountX);
int totalY = getTotal(wordCountY);
wordCountX.forEach((k,v)->{
Integer count = wordCountY.get(k);
if (count.getIntValue() / totalY == v.getIntValue() / totalX)
similarities.put(k, Integer.valueOf(v.getIntValue() / totalY);
});
只需添加多个文件输入路径,您就可以将多个文件输入到同一个Mapper。然后,您可以使用 mapperContext 来识别哪个文件拆分来自哪个文件位置。
基本上,
第 1 步:MR 作业
读取文件1+2
在 mapper 中发出
<word, [val1, val2]>
(如果文件从 file1 拆分出来,则 val1 为 1,否则为 0;与 val2 类似)- 在 reducer 中写入 hashmap
<work, [file1_count, file2_count]>
第 2 步:合并分片(wordcount 不能那么大并且应该适合一台机器)并使用简单的 java 作业创建自定义相似性指标