在 MapReduce 中,如何将 arraylist 作为值从 mapper 发送到 reducer
In a MapReduce , how to send arraylist as value from mapper to reducer
我们如何将数组列表作为值从映射器传递到缩减器。
我的代码基本上有一定的规则可以使用,并且会根据 rules.I 创建新值(字符串),我将所有输出(在规则执行后生成)维护在一个列表中,现在需要发送此输出(Mapper 值)到 Reducer 并且没有办法这样做。
谁能给我指个方向
添加代码
package develop;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import utility.RulesExtractionUtility;
public class CustomMap{
public static class CustomerMapper extends Mapper<Object, Text, Text, Text> {
private Map<String, String> rules;
@Override
public void setup(Context context)
{
try
{
URI[] cacheFiles = context.getCacheFiles();
setupRulesMap(cacheFiles[0].toString());
}
catch (IOException ioe)
{
System.err.println("Error reading state file.");
System.exit(1);
}
}
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Map<String, String> rules = new LinkedHashMap<String, String>();
// rules.put("targetcolumn[1]", "ASSIGN(source[0])");
// rules.put("targetcolumn[2]", "INCOME(source[2]+source[3])");
// rules.put("targetcolumn[3]", "ASSIGN(source[1]");
// Above is the "rules", which would basically create some list values from source file
String [] splitSource = value.toString().split(" ");
List<String>lists=RulesExtractionUtility.rulesEngineExecutor(splitSource,rules);
// lists would have values like (name, age) for each line from a huge text file, which is what i want to write in context and pass it to the reducer.
// As of now i havent implemented the reducer code, as m stuck with passing the value from mapper.
// context.write(new Text(), lists);---- I do not have a way of doing this
}
private void setupRulesMap(String filename) throws IOException
{
Map<String, String> rule = new LinkedHashMap<String, String>();
BufferedReader reader = new BufferedReader(new FileReader(filename));
String line = reader.readLine();
while (line != null)
{
String[] split = line.split("=");
rule.put(split[0], split[1]);
line = reader.readLine();
// rules logic
}
rules = rule;
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: customerMapper <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(CustomMap.class);
job.setMapperClass(CustomerMapper.class);
job.addCacheFile(new URI("Some HDFS location"));
URI[] cacheFiles= job.getCacheFiles();
if(cacheFiles != null) {
for (URI cacheFile : cacheFiles) {
System.out.println("Cache file ->" + cacheFile);
}
}
// job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
一种方法(可能不是唯一也不是最好的方法)是
在字符串中序列化您的列表,以将其传递给映射器中的输出值
当您读取减速器中的输入值时,从字符串反序列化并重建您的列表
如果这样做,那么您还应该去掉包含序列化列表的字符串中的所有特殊符号(例如 \n
或 \t
等符号)。实现这一点的一种简单方法是使用 base64 编码的字符串。
您应该发送 Text
个对象而不是 String
个对象。然后你可以在 Reducer 中使用 object.toString()
。请务必正确配置您的驱动程序。
如果您 post 您的代码,我们将进一步帮助您。
要将 arraylist 从 mapper 传递到 reducer,显然对象必须实现 Writable 接口。你为什么不试试这个图书馆?
<dependency>
<groupId>org.apache.giraph</groupId>
<artifactId>giraph-core</artifactId>
<version>1.1.0-hadoop2</version>
</dependency>
它有一个摘要class:
public abstract class ArrayListWritable<M extends org.apache.hadoop.io.Writable>
extends ArrayList<M>
implements org.apache.hadoop.io.Writable, org.apache.hadoop.conf.Configurable
您可以创建自己的 class 和源代码来填充抽象方法并使用您的代码实现接口方法。例如:
public class MyListWritable extends ArrayListWritable<Text>{
...
}
我们如何将数组列表作为值从映射器传递到缩减器。
我的代码基本上有一定的规则可以使用,并且会根据 rules.I 创建新值(字符串),我将所有输出(在规则执行后生成)维护在一个列表中,现在需要发送此输出(Mapper 值)到 Reducer 并且没有办法这样做。
谁能给我指个方向
添加代码
package develop;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import utility.RulesExtractionUtility;
public class CustomMap{
public static class CustomerMapper extends Mapper<Object, Text, Text, Text> {
private Map<String, String> rules;
@Override
public void setup(Context context)
{
try
{
URI[] cacheFiles = context.getCacheFiles();
setupRulesMap(cacheFiles[0].toString());
}
catch (IOException ioe)
{
System.err.println("Error reading state file.");
System.exit(1);
}
}
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Map<String, String> rules = new LinkedHashMap<String, String>();
// rules.put("targetcolumn[1]", "ASSIGN(source[0])");
// rules.put("targetcolumn[2]", "INCOME(source[2]+source[3])");
// rules.put("targetcolumn[3]", "ASSIGN(source[1]");
// Above is the "rules", which would basically create some list values from source file
String [] splitSource = value.toString().split(" ");
List<String>lists=RulesExtractionUtility.rulesEngineExecutor(splitSource,rules);
// lists would have values like (name, age) for each line from a huge text file, which is what i want to write in context and pass it to the reducer.
// As of now i havent implemented the reducer code, as m stuck with passing the value from mapper.
// context.write(new Text(), lists);---- I do not have a way of doing this
}
private void setupRulesMap(String filename) throws IOException
{
Map<String, String> rule = new LinkedHashMap<String, String>();
BufferedReader reader = new BufferedReader(new FileReader(filename));
String line = reader.readLine();
while (line != null)
{
String[] split = line.split("=");
rule.put(split[0], split[1]);
line = reader.readLine();
// rules logic
}
rules = rule;
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: customerMapper <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(CustomMap.class);
job.setMapperClass(CustomerMapper.class);
job.addCacheFile(new URI("Some HDFS location"));
URI[] cacheFiles= job.getCacheFiles();
if(cacheFiles != null) {
for (URI cacheFile : cacheFiles) {
System.out.println("Cache file ->" + cacheFile);
}
}
// job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
一种方法(可能不是唯一也不是最好的方法)是
在字符串中序列化您的列表,以将其传递给映射器中的输出值
当您读取减速器中的输入值时,从字符串反序列化并重建您的列表
如果这样做,那么您还应该去掉包含序列化列表的字符串中的所有特殊符号(例如 \n
或 \t
等符号)。实现这一点的一种简单方法是使用 base64 编码的字符串。
您应该发送 Text
个对象而不是 String
个对象。然后你可以在 Reducer 中使用 object.toString()
。请务必正确配置您的驱动程序。
如果您 post 您的代码,我们将进一步帮助您。
要将 arraylist 从 mapper 传递到 reducer,显然对象必须实现 Writable 接口。你为什么不试试这个图书馆?
<dependency>
<groupId>org.apache.giraph</groupId>
<artifactId>giraph-core</artifactId>
<version>1.1.0-hadoop2</version>
</dependency>
它有一个摘要class:
public abstract class ArrayListWritable<M extends org.apache.hadoop.io.Writable>
extends ArrayList<M>
implements org.apache.hadoop.io.Writable, org.apache.hadoop.conf.Configurable
您可以创建自己的 class 和源代码来填充抽象方法并使用您的代码实现接口方法。例如:
public class MyListWritable extends ArrayListWritable<Text>{
...
}