Hbase MapReduce:如何使用自定义 class 作为映射器 and/or reducer 的值?
Hbase MapReduce: how to use custom class as value for the mapper and/or reducer?
我正在尝试熟悉 Hadoop/Hbase MapReduce 作业以便能够正确编写它们。现在我有一个 Hbase 实例,它带有一个名为 dns 的 table 和一些 DNS 记录。我试图制作一个简单的唯一域计数器来输出文件并且它有效。现在,我只使用 IntWritable
或 Text
,我想知道是否可以为我的 Mapper/Reducer 使用自定义对象。我试着自己做,但我得到了
Error: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:415)
at org.apache.hadoop.mapred.MapTask.access0(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:170)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
... 9 more
由于我是新手,所以我实际上不知道该怎么做。我猜我必须实现一个或多个接口或扩展抽象 class,但我找不到 here 或在互联网上找到合适的示例。
我尝试从我的 dns table 制作一个简单的域计数器,但使用 class 作为整数的包装器(仅用于教学目的)。我的 地图 class 看起来像这样:
public class Map extends TableMapper<Text, MapperOutputValue> {
private static byte[] columnName = "fqdn".getBytes();
private static byte[] columnFamily = "d".getBytes();
public void map(ImmutableBytesWritable row, Result value, Context context)
throws InterruptedException, IOException {
String fqdn = new String(value.getValue(columnFamily, columnName));
Text key = new Text();
key.set(fqdn);
context.write(key, new MapperOutputValue(1));
}
}
减速机:
public class Reduce extends Reducer<Text, MapperOutputValue, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<MapperOutputValue> values, Context context)
throws IOException, InterruptedException {
int i = 0;
for (MapperOutputValue val : values) {
i += val.getCount();
}
context.write(key, new IntWritable(i));
}
}
我的 Driver/Main 函数的一部分:
TableMapReduceUtil.initTableMapperJob(
"dns",
scan,
Map.class,
Text.class,
MapperOutputValue.class,
job);
/* Set output parameters */
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
正如我所说,MapperOutputValue 只是一个简单的 class,它包含一个私有整数、一个带有参数的构造函数、一个 getter 和一个 setter。我也尝试添加 toString
方法,但它仍然不起作用。
所以我的问题是:使用自定义 classes 作为减速器 mapper/input 的输出的最佳方法是什么?另外,假设我想使用具有多个字段的 class 作为 reducer 的最终输出。这个classimplement/extends应该怎么办?这是个好主意还是我应该坚持使用 "primitives" 作为 IntWritable 或文本?
谢谢!
MapOutputValue
应该实现 Writable
,这样它就可以在 MapReduce 作业中的任务之间进行序列化。将 MapOutputJob
替换为以下内容应该有效:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class DomainCountWritable implements Writable {
private Text domain;
private IntWritable count;
public DomainCountWritable() {
this.domain = new Text();
this.count = new IntWritable(0);
}
public DomainCountWritable(Text domain, IntWritable count) {
this.domain = domain;
this.count = count;
}
public Text getDomain() {
return this.domain;
}
public IntWritable getCount() {
return this.count;
}
public void setDomain(Text domain) {
this.domain = domain;
}
public void setCount(IntWritable count) {
this.count = count;
}
public void readFields(DataInput in) throws IOException {
this.domain.readFields(in);
this.count.readFields(in);
}
public void write(DataOutput out) throws IOException {
this.domain.write(out);
this.count.write(out);
}
@Override
public String toString() {
return this.domain.toString() + "\t" + this.count.toString();
}
}
我正在尝试熟悉 Hadoop/Hbase MapReduce 作业以便能够正确编写它们。现在我有一个 Hbase 实例,它带有一个名为 dns 的 table 和一些 DNS 记录。我试图制作一个简单的唯一域计数器来输出文件并且它有效。现在,我只使用 IntWritable
或 Text
,我想知道是否可以为我的 Mapper/Reducer 使用自定义对象。我试着自己做,但我得到了
Error: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:415)
at org.apache.hadoop.mapred.MapTask.access0(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:170)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
... 9 more
由于我是新手,所以我实际上不知道该怎么做。我猜我必须实现一个或多个接口或扩展抽象 class,但我找不到 here 或在互联网上找到合适的示例。
我尝试从我的 dns table 制作一个简单的域计数器,但使用 class 作为整数的包装器(仅用于教学目的)。我的 地图 class 看起来像这样:
public class Map extends TableMapper<Text, MapperOutputValue> {
private static byte[] columnName = "fqdn".getBytes();
private static byte[] columnFamily = "d".getBytes();
public void map(ImmutableBytesWritable row, Result value, Context context)
throws InterruptedException, IOException {
String fqdn = new String(value.getValue(columnFamily, columnName));
Text key = new Text();
key.set(fqdn);
context.write(key, new MapperOutputValue(1));
}
}
减速机:
public class Reduce extends Reducer<Text, MapperOutputValue, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<MapperOutputValue> values, Context context)
throws IOException, InterruptedException {
int i = 0;
for (MapperOutputValue val : values) {
i += val.getCount();
}
context.write(key, new IntWritable(i));
}
}
我的 Driver/Main 函数的一部分:
TableMapReduceUtil.initTableMapperJob(
"dns",
scan,
Map.class,
Text.class,
MapperOutputValue.class,
job);
/* Set output parameters */
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
正如我所说,MapperOutputValue 只是一个简单的 class,它包含一个私有整数、一个带有参数的构造函数、一个 getter 和一个 setter。我也尝试添加 toString
方法,但它仍然不起作用。
所以我的问题是:使用自定义 classes 作为减速器 mapper/input 的输出的最佳方法是什么?另外,假设我想使用具有多个字段的 class 作为 reducer 的最终输出。这个classimplement/extends应该怎么办?这是个好主意还是我应该坚持使用 "primitives" 作为 IntWritable 或文本?
谢谢!
MapOutputValue
应该实现 Writable
,这样它就可以在 MapReduce 作业中的任务之间进行序列化。将 MapOutputJob
替换为以下内容应该有效:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class DomainCountWritable implements Writable {
private Text domain;
private IntWritable count;
public DomainCountWritable() {
this.domain = new Text();
this.count = new IntWritable(0);
}
public DomainCountWritable(Text domain, IntWritable count) {
this.domain = domain;
this.count = count;
}
public Text getDomain() {
return this.domain;
}
public IntWritable getCount() {
return this.count;
}
public void setDomain(Text domain) {
this.domain = domain;
}
public void setCount(IntWritable count) {
this.count = count;
}
public void readFields(DataInput in) throws IOException {
this.domain.readFields(in);
this.count.readFields(in);
}
public void write(DataOutput out) throws IOException {
this.domain.write(out);
this.count.write(out);
}
@Override
public String toString() {
return this.domain.toString() + "\t" + this.count.toString();
}
}