Hadoop map-reduce 映射程序编程
Hadoop map-reduce mapper programming
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class ADDMapper extends MapReduceBase implements Mapper<LongWritable,
Text,Text,LongWritable>
{ @Override
public void map(LongWritable key, Text value,OutputCollector<Text, LongWritable> output, Reporter r)throws IOException
{
String s=value.toString();
char[] words=s.toCharArray();
int wno=0;
int ino=0;
for(int i=0;i<words.length;i++)
{
String temp="";
for(int j=ino;j<words.length;j++)
{
if(words[j]!=' ')
{ temp+=words[j];
}
else
{
wno=j;
if(temp!="")
{
ino=ino + key; //////POINT OF ERROR
output.collect(new Text(temp),new LongWritable(ino));
}
temp="";
ino=wno+1;
break;
}
}
}
}
}
我想获取每个字符串的索引值,按字符串排序。
上面的代码既没有给出索引值也没有打乱字符串。
让
输入文件:
你好你好吗
嗨,我是对的。
你的工作怎么样。
你好,你还好吗
输出:
我 50 岁
是 7,33
嗨 0,30,44
如何 3,14
.
.
你好 Shivendra 我写了下面的映射器逻辑,它将帮助你找到每个字符串的索引和排序输出。
此代码的输出是按其索引排序的字符串,然后您可以 运行 在此输出上进行缩减。
String str=value.toString();
String[] tokens = str.split(" "); //split into words
//create hashmap for unique word
Map<String,Integer> uniqueString = new HashMap<String,Integer>();
for(int i=0;i<tokens.length;i++){
uniqueString.put(tokens[i],1);
}
//for sorting create TreeMap from above hash map
Map<String,Integer> map = new TreeMap<String,Integer>(uniqueString);
for (Map.Entry entry : map.entrySet()) {
int index=0;
//find the index of the word
index = str.indexOf((String)entry.getKey());
while (index >= 0) {
output.collect(new Text((String)entry.getKey()),new LongWritable(index));
index = str.indexOf((String)entry.getKey(), index + 1);
}
}
这个逻辑的输出:
am:20,
are:7,
are:50,
hi:0,
hi:15,
hi:47,
how:3,
how:30,
i:1,
i:16,
i:18,
i:24,
i:34,
i:48,
is:34,
职位:42,
好的。:58,
对:23,
you:11,
you:37,
you:54,
your:37
可能对你有帮助。
请 运行 下面的代码,它 运行 没问题,并给出了您预期的输出。
在命令行参数中提供输入和输出路径。(args[0], args[1])
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class IndexCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String str=value.toString();
String[] tokens = str.split(" "); //split into words
//create hashmap for unique word
HashMap<String,Integer> uniqueString = new HashMap<String,Integer>();
for(int i=0;i<tokens.length;i++){
uniqueString.put(tokens[i],1);
}
//for sorting create TreeMap from above hash map
TreeMap<String, Integer> map = new TreeMap<String,Integer>(uniqueString);
for (Entry<String, Integer> entry : map.entrySet()) {
int index=0;
//find the index of the word
index = str.indexOf((String)entry.getKey());
while (index >= 0) {
output.collect(new Text((String)entry.getKey()),new IntWritable(index));
index = str.indexOf((String)entry.getKey(), index + 1);
}
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
output.collect(key, new IntWritable(values.next().get()));
}
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("indexfinder");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
请运行下面的代码,它给出预期的输出。
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Index {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str=value.toString();
String[] tokens = str.split(" "); //split into words
//create hashmap for unique word
HashMap<String,Integer> uniqueString = new HashMap<String,Integer>();
for(int i=0;i<tokens.length;i++){
uniqueString.put(tokens[i],1);
}
//for sorting create TreeMap from above hash map
TreeMap<String, Integer> map = new TreeMap<String,Integer>(uniqueString);
Configuration conf=context.getConfiguration();
int strIndex = 0;
for (Entry<String, Integer> entry : map.entrySet()) {
//int index=0;
strIndex=conf.getInt("index", 0);
//find the index of the word
int index = str.indexOf((String)entry.getKey());
while (index >= 0) {
index+=strIndex;
context.write(new Text((String)entry.getKey()),new IntWritable(index));
index = str.indexOf((String)entry.getKey(), index + 1);
}
}
conf.setInt("index", strIndex+str.length());
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for (IntWritable val : values) {
context.write(key, new IntWritable(val.get()));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInt("index", 0);
Job job = new Job(conf, "index");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
job.waitForCompletion(true);
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class ADDMapper extends MapReduceBase implements Mapper<LongWritable,
Text,Text,LongWritable>
{ @Override
public void map(LongWritable key, Text value,OutputCollector<Text, LongWritable> output, Reporter r)throws IOException
{
String s=value.toString();
char[] words=s.toCharArray();
int wno=0;
int ino=0;
for(int i=0;i<words.length;i++)
{
String temp="";
for(int j=ino;j<words.length;j++)
{
if(words[j]!=' ')
{ temp+=words[j];
}
else
{
wno=j;
if(temp!="")
{
ino=ino + key; //////POINT OF ERROR
output.collect(new Text(temp),new LongWritable(ino));
}
temp="";
ino=wno+1;
break;
}
}
}
}
}
我想获取每个字符串的索引值,按字符串排序。
上面的代码既没有给出索引值也没有打乱字符串。
让
输入文件:
你好你好吗
嗨,我是对的。
你的工作怎么样。
你好,你还好吗
输出: 我 50 岁 是 7,33 嗨 0,30,44 如何 3,14 . .
你好 Shivendra 我写了下面的映射器逻辑,它将帮助你找到每个字符串的索引和排序输出。 此代码的输出是按其索引排序的字符串,然后您可以 运行 在此输出上进行缩减。
String str=value.toString();
String[] tokens = str.split(" "); //split into words
//create hashmap for unique word
Map<String,Integer> uniqueString = new HashMap<String,Integer>();
for(int i=0;i<tokens.length;i++){
uniqueString.put(tokens[i],1);
}
//for sorting create TreeMap from above hash map
Map<String,Integer> map = new TreeMap<String,Integer>(uniqueString);
for (Map.Entry entry : map.entrySet()) {
int index=0;
//find the index of the word
index = str.indexOf((String)entry.getKey());
while (index >= 0) {
output.collect(new Text((String)entry.getKey()),new LongWritable(index));
index = str.indexOf((String)entry.getKey(), index + 1);
}
}
这个逻辑的输出: am:20, are:7, are:50, hi:0, hi:15, hi:47, how:3, how:30, i:1, i:16, i:18, i:24, i:34, i:48, is:34, 职位:42, 好的。:58, 对:23, you:11, you:37, you:54, your:37
可能对你有帮助。
请 运行 下面的代码,它 运行 没问题,并给出了您预期的输出。
在命令行参数中提供输入和输出路径。(args[0], args[1])
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class IndexCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String str=value.toString();
String[] tokens = str.split(" "); //split into words
//create hashmap for unique word
HashMap<String,Integer> uniqueString = new HashMap<String,Integer>();
for(int i=0;i<tokens.length;i++){
uniqueString.put(tokens[i],1);
}
//for sorting create TreeMap from above hash map
TreeMap<String, Integer> map = new TreeMap<String,Integer>(uniqueString);
for (Entry<String, Integer> entry : map.entrySet()) {
int index=0;
//find the index of the word
index = str.indexOf((String)entry.getKey());
while (index >= 0) {
output.collect(new Text((String)entry.getKey()),new IntWritable(index));
index = str.indexOf((String)entry.getKey(), index + 1);
}
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
output.collect(key, new IntWritable(values.next().get()));
}
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("indexfinder");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
请运行下面的代码,它给出预期的输出。
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Index {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str=value.toString();
String[] tokens = str.split(" "); //split into words
//create hashmap for unique word
HashMap<String,Integer> uniqueString = new HashMap<String,Integer>();
for(int i=0;i<tokens.length;i++){
uniqueString.put(tokens[i],1);
}
//for sorting create TreeMap from above hash map
TreeMap<String, Integer> map = new TreeMap<String,Integer>(uniqueString);
Configuration conf=context.getConfiguration();
int strIndex = 0;
for (Entry<String, Integer> entry : map.entrySet()) {
//int index=0;
strIndex=conf.getInt("index", 0);
//find the index of the word
int index = str.indexOf((String)entry.getKey());
while (index >= 0) {
index+=strIndex;
context.write(new Text((String)entry.getKey()),new IntWritable(index));
index = str.indexOf((String)entry.getKey(), index + 1);
}
}
conf.setInt("index", strIndex+str.length());
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for (IntWritable val : values) {
context.write(key, new IntWritable(val.get()));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInt("index", 0);
Job job = new Job(conf, "index");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
job.waitForCompletion(true);
}
}