使用 Pig 脚本将 Json 导入到 Hbase

Import Json to Hbase using Pig script

我正在尝试编写一个允许我加载 Json 的 pig 脚本(从弹性搜索中获取并转储到 hdfs)。

我已经为此苦苦挣扎了好几天,也许有人可以让我对我遇到的问题有一些了解。

这是我写的一个快速猪脚本,用于从 hbase 读取任意修改数据,然后存储回 hbase(只是为了确保一切正常)

REGISTER hbase-common-1.1.1.jar
REGISTER /tmp/udfs/json-simple-1.1.1.jar
REGISTER /tmp/udfs/elephant-bird-hadoop-compat-4.9.jar
REGISTER /tmp/udfs/elephant-bird-pig-4.9.jar
REGISTER /user/hdfs/share/libs/guava-11.0.jar
REGISTER /user/hdfs/share/libs/zookeeper-3.4.6.2.2.4.2-2.jar

set hbase.zookeeper.quorum 'list of servers';    

raw = LOAD 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:a', '-loadKey true -limit 5') AS (id:bytearray, a:chararray);
keys = FOREACH raw GENERATE id, CONCAT(a, '1');

keys = LIMIT keys 1;

STORE keys INTO 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:id esinfo:a');    

运行此脚本的结果是数据从 hbase 读取并存储回 hbase,它运行完美。

然后我尝试修改脚本以现在从 json 文件而不是 Hbase 加载数据。

REGISTER hbase-common-1.1.1.jar
REGISTER /tmp/udfs/json-simple-1.1.1.jar
REGISTER /tmp/udfs/elephant-bird-hadoop-compat-4.9.jar
REGISTER /tmp/udfs/elephant-bird-pig-4.9.jar
REGISTER /user/hdfs/share/libs/guava-11.0.jar
REGISTER /user/hdfs/share/libs/zookeeper-3.4.6.2.2.4.2-2.jar

set hbase.zookeeper.quorum 'list of servers';

raw_data = LOAD '/user/hdfs/input/EsImports/2014-04-22.json' using com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]); 
keys = FOREACH raw_data GENERATE
    json#'sid' as id:bytearray,
    json#'userAgent' as a:chararray;

limit_keys = LIMIT keys 1;

STORE limit_keys INTO 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:id esinfo:a');

这是失败的脚本,我感觉它与正在加载的数据的模式有关,但是当我对数据执行描述和转储时,它似乎完全相同结构

此外,当脚本失败时我得到的错误如下

ERROR 2244: Job job_1439978375936_0215 failed, hadoop does not return any error message

完整的错误日志

Log Type: syslog
Log Upload Time: Mon Aug 24 13:28:43 +0200 2015
Log Length: 4121
2015-08-24 13:28:35,504 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1439978375936_0238_000001
2015-08-24 13:28:35,910 WARN [main] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-08-24 13:28:35,921 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Executing with tokens:
2015-08-24 13:28:35,921 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: YARN_AM_RM_TOKEN, Service: , Ident: (appAttemptId { application_id { id: 238 cluster_timestamp: 1439978375936 } attemptId: 1 } keyId: 176959833)
2015-08-24 13:28:36,056 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: mapreduce.job, Service: job_1439978375936_0236, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@331fef77)
2015-08-24 13:28:36,057 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: RM_DELEGATION_TOKEN, Service: {ip removed}, Ident: (owner=darryn, renewer=mr token, realUser=hcat, issueDate=1440415651774, maxDate=1441020451774, sequenceNumber=176, masterKeyId=149)
2015-08-24 13:28:36,070 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Using mapred newApiCommitter.
2015-08-24 13:28:36,699 WARN [main] org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2015-08-24 13:28:36,804 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: OutputCommitter set in config null
2015-08-24 13:28:36,950 FATAL [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Error starting MRAppMaster
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/mapreduce/TableInputFormat
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:657)
    at org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:726)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore.getStoreFunc(POStore.java:251)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.getCommitters(PigOutputCommitter.java:88)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.<init>(PigOutputCommitter.java:71)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat.getOutputCommitter(PigOutputFormat.java:289)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.call(MRAppMaster.java:470)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.call(MRAppMaster.java:452)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.callWithJobClassLoader(MRAppMaster.java:1541)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.createOutputCommitter(MRAppMaster.java:452)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.serviceInit(MRAppMaster.java:371)
    at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.run(MRAppMaster.java:1499)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.initAndStartAppMaster(MRAppMaster.java:1496)
    at org.apache.hadoop.mapreduce.v2.app.MRAppMaster.main(MRAppMaster.java:1429)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.TableInputFormat
    at java.net.URLClassLoader.run(URLClassLoader.java:366)
    at java.net.URLClassLoader.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 20 more
2015-08-24 13:28:36,954 INFO [main] org.apache.hadoop.util.ExitUtil: Exiting with status 1

编辑:

所以我注意到一些有趣的行为,如果我使用 PigStorage 将存储的数据保存到 ailias 中并指定 -schema 选项,然后在一个单独的脚本中加载这个文件备份(仍然使用 PigStorage)我可以直接插入进入 HBase,这让我怀疑它与模式的存储方式有关

所以我最后使用的解决方案绝不是最优的,但效果很好。

从 json 文件读取数据并生成模式后,您要做的是使用 pig 存储将其保存回文件,然后再读回该文件。

fs -rm -r /tmp/estest2
Store test into '/tmp/estest2' USING PigStorage('\t', '-schema');

processed_data = LOAD '/tmp/estest2' USING PigStorage('\t'); 

EXEC; //Used to sync the script and allow it to finish up until this point

我怀疑发生的事情是 HbaseStorage 错误解释了 JsonLoader 使用的大象鸟类型,但它确实理解 PigStorage 类型,因此允许它将数据加载到 hbase 中。

我在执行此操作时还发现了其他一些东西。您的数据别名中需要一个 'id' 字段,但不得在传递给 hbase 的参数列表中指定此字段。

使用此解决方案的简化工作脚本如下所示

REGISTER hbase-common-1.1.1.jar
REGISTER /tmp/udfs/json-simple-1.1.1.jar
REGISTER /tmp/udfs/elephant-bird-hadoop-compat-4.9.jar
REGISTER /tmp/udfs/elephant-bird-pig-4.9.jar
REGISTER /user/hdfs/share/libs/guava-11.0.jar
REGISTER /user/hdfs/share/libs/zookeeper-3.4.6.2.2.4.2-2.jar

set hbase.zookeeper.quorum 'list of servers';

raw_data = LOAD '/user/hdfs/input/EsImports/2014-04-22.json' using com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]); 
keys = FOREACH raw_data GENERATE
    json#'sid' as id:bytearray, //ID field will not be included in HBaseStorage function call but will be used
    json#'userAgent' as a:chararray;

limit_keys = LIMIT keys 1;

//This is super hacky but works
fs -rm -r /tmp/estest2 //fails if the directory does not exist
Store limit_keys into '/tmp/estest2' USING PigStorage('\t', '-schema');
processed_data = LOAD '/tmp/estest2' USING PigStorage('\t'); 

EXEC; //Used to sync the script and allow it to finish up until this point before starting to insert to hbase

STORE processed_data INTO 'hbase://esimporttest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('esinfo:a');