Hbase 批量加载附加数据而不是覆盖它们
Hbase Bulkload append Data instead overwrite them
我实际上是在 Mapreduce 和 Bulkload 的帮助下将数据加载到 Hbase 中,我在 Java 中实现了它们。
所以基本上我创建了一个 Mapper 并使用 HFileOutputFormat2.configureIncrementalLoad
(问题末尾的完整代码)来减少,我使用一个 mapper,它只是从文件中读取一些字节并创建一个 put。使用 LoadIncrementalHFiles.doBulkLoad
将数据写到 Hbase 中。这一切都很好。但可以肯定的是,什么时候这样做会覆盖 Hbase 中的旧值。所以我正在寻找一种方法来追加数据,就像 api 中的追加函数一样。
感谢阅读,希望你们中的一些人有可以帮助我的想法:)
public int run(String[] args) throws Exception {
int result=0;
String outputPath = args[1];
Configuration configuration = getConf();
configuration.set("data.seperator", DATA_SEPERATOR);
configuration.set("hbase.table.name",TABLE_NAME);
configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);
Job job = Job.getInstance(configuration);
job.setJarByClass(HBaseBulkLoadDriver.class);
job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapperClass(HBaseBulkLoadMapper.class);
FileInputFormat.addInputPaths(job, args[0]);
FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
HFileOutputFormat2.setOutputPath(job,new Path((outputPath)));
job.setMapOutputValueClass(Put.class);
Connection c = ConnectionFactory.createConnection(configuration);
Table t = c.getTable(TableName.valueOf(TABLE_NAME));
RegionLocator rl = c.getRegionLocator(TableName.valueOf(TABLE_NAME));
HFileOutputFormat2.configureIncrementalLoad(job,t,rl);
System.out.println("start");
job.waitForCompletion(true);
if (job.isSuccessful()) {
HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
} else {
result = -1;
}
return result;
}
public static void doBulkLoad(String pathToHFile, String tableName) {
try {
Configuration configuration = new Configuration();
configuration.set("mapreduce.child.java.opts", "-Xmx1g");
HBaseConfiguration.addHbaseResources(configuration);
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
//HTable hTable = new HTable(configuration, tableName);
//loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);
Connection connection = ConnectionFactory.createConnection(configuration);
Table table = connection.getTable(TableName.valueOf(tableName));
Admin admin = connection.getAdmin();
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
//path, admin, table, region locator
loadFfiles.doBulkLoad(new Path(pathToHFile),admin,table,regionLocator);
System.out.println("Bulk Load Completed..");
} catch(Exception exception) {
exception.printStackTrace();
}
根据评论中的要求,我在这里添加了 table 描述的输出,因为 table 是由 python happybase api 创建的,我'我不确定 api 默认设置的选项标志是什么...
{NAME => '0', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B
LOCK_ENCODING => 'NONE', TTL => 'FOREVER', 压缩 => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false' , 血氧
CKSIZE => '65536', REPLICATION_SCOPE => '0'}<br>
{NAME => '1', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B
LOCK_ENCODING => 'NONE', TTL => 'FOREVER', 压缩 => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false' , 血氧
CKSIZE => '65536', REPLICATION_SCOPE => '0'}
在HFileOutputFormat2.configureIncrementalLoad()中http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html#line.408
PutSortReducer 用作减速器。
在PutSortReducer.reduce()中http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/PutSortReducer.html
KeyValues 存储在 TreeSet 中,比较器仅比较键。这就是为什么只有一个值存活下来的原因。
要保留这 2 个值,您可以基于 PutSortReducer 创建自己的 reducer,您可以在其中保留这 2 个值。并设置它:
HFileOutputFormat2.configureIncrementalLoad(工作,t,rl);
job.setReducerClass(MyReducer.class);
我实际上是在 Mapreduce 和 Bulkload 的帮助下将数据加载到 Hbase 中,我在 Java 中实现了它们。
所以基本上我创建了一个 Mapper 并使用 HFileOutputFormat2.configureIncrementalLoad
(问题末尾的完整代码)来减少,我使用一个 mapper,它只是从文件中读取一些字节并创建一个 put。使用 LoadIncrementalHFiles.doBulkLoad
将数据写到 Hbase 中。这一切都很好。但可以肯定的是,什么时候这样做会覆盖 Hbase 中的旧值。所以我正在寻找一种方法来追加数据,就像 api 中的追加函数一样。
感谢阅读,希望你们中的一些人有可以帮助我的想法:)
public int run(String[] args) throws Exception {
int result=0;
String outputPath = args[1];
Configuration configuration = getConf();
configuration.set("data.seperator", DATA_SEPERATOR);
configuration.set("hbase.table.name",TABLE_NAME);
configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);
Job job = Job.getInstance(configuration);
job.setJarByClass(HBaseBulkLoadDriver.class);
job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapperClass(HBaseBulkLoadMapper.class);
FileInputFormat.addInputPaths(job, args[0]);
FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
HFileOutputFormat2.setOutputPath(job,new Path((outputPath)));
job.setMapOutputValueClass(Put.class);
Connection c = ConnectionFactory.createConnection(configuration);
Table t = c.getTable(TableName.valueOf(TABLE_NAME));
RegionLocator rl = c.getRegionLocator(TableName.valueOf(TABLE_NAME));
HFileOutputFormat2.configureIncrementalLoad(job,t,rl);
System.out.println("start");
job.waitForCompletion(true);
if (job.isSuccessful()) {
HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
} else {
result = -1;
}
return result;
}
public static void doBulkLoad(String pathToHFile, String tableName) {
try {
Configuration configuration = new Configuration();
configuration.set("mapreduce.child.java.opts", "-Xmx1g");
HBaseConfiguration.addHbaseResources(configuration);
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
//HTable hTable = new HTable(configuration, tableName);
//loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);
Connection connection = ConnectionFactory.createConnection(configuration);
Table table = connection.getTable(TableName.valueOf(tableName));
Admin admin = connection.getAdmin();
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
//path, admin, table, region locator
loadFfiles.doBulkLoad(new Path(pathToHFile),admin,table,regionLocator);
System.out.println("Bulk Load Completed..");
} catch(Exception exception) {
exception.printStackTrace();
}
根据评论中的要求,我在这里添加了 table 描述的输出,因为 table 是由 python happybase api 创建的,我'我不确定 api 默认设置的选项标志是什么...
{NAME => '0', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B
LOCK_ENCODING => 'NONE', TTL => 'FOREVER', 压缩 => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false' , 血氧
CKSIZE => '65536', REPLICATION_SCOPE => '0'}<br>
{NAME => '1', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B
LOCK_ENCODING => 'NONE', TTL => 'FOREVER', 压缩 => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false' , 血氧
CKSIZE => '65536', REPLICATION_SCOPE => '0'}
在HFileOutputFormat2.configureIncrementalLoad()中http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html#line.408 PutSortReducer 用作减速器。
在PutSortReducer.reduce()中http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/PutSortReducer.html KeyValues 存储在 TreeSet 中,比较器仅比较键。这就是为什么只有一个值存活下来的原因。
要保留这 2 个值,您可以基于 PutSortReducer 创建自己的 reducer,您可以在其中保留这 2 个值。并设置它:
HFileOutputFormat2.configureIncrementalLoad(工作,t,rl); job.setReducerClass(MyReducer.class);