Spark Yarn Cluster 连接到 Hbase 错误

Spark Yarn Cluster connection to Hbase error

我有一个应用程序可以解析 vcf 文件并将数据插入 hbase。应用程序 运行s 在使用 master local 时使用 apache spark 没有问题,但是当我 运行 它使用 apache spark yarn cluster 时,出现以下错误:

17/03/31 10:36:09 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:10 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:11 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:12 INFO yarn.Client: Application report for application_1490344846293_0020 (state: FINISHED)
17/03/31 10:36:12 INFO yarn.Client: 
     client token: N/A
     diagnostics: User class threw exception: java.lang.RuntimeException: org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the locations
     ApplicationMaster host: 192.168.0.14
     ApplicationMaster RPC port: 0
     queue: default
     start time: 1490956367991
     final status: FAILED
     tracking URL: http://master1:8088/proxy/application_1490344846293_0020/
     user: ubuntu
Exception in thread "main" org.apache.spark.SparkException: Application application_1490344846293_0020 finished with failed status
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1167)
    at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1213)
    at org.apache.spark.deploy.yarn.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:187)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/03/31 10:36:12 INFO util.ShutdownHookManager: Shutdown hook called
17/03/31 10:36:12 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-e6867ef3-fad4-424a-b6d3-f79f48bd65ea

我使用以下代码连接到hbase:

    package com.mycompany.app;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.spark.api.java.function.VoidFunction;

    import java.io.IOException;

    class HbaseAppender implements VoidFunction<Put> {


        private final String TABLE_NAME = "data";
        private final String COLUMN_FAMILY_NAME = "v_data";


        static private HTable hTable;


        public HbaseAppender(){
            init();
        }

        //method used to establish connection to Hbase
        private void init(){
        TableName tableName;
        Configuration hconf = HBaseConfiguration.create();
        hconf.set("hbase.zookeeper.property.clientPort", "2181");
        hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
        hconf.set("hbase.zookeeper.quorum", "master1");

            try {
                Connection connection = ConnectionFactory.createConnection();

                Admin admin = connection.getAdmin();
                tableName = TableName.valueOf(TABLE_NAME);
                if(!admin.tableExists(tableName)) {
                    HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(COLUMN_FAMILY_NAME);
                    HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
                    hTableDescriptor.addFamily(hColumnDescriptor);
                    admin.createTable(hTableDescriptor);
                }
                this.hTable = new HTable(tableName,connection);

           }
            catch (IOException e){
                throw new RuntimeException(e);
            }
        }

        @Override
        public void call(Put put) throws Exception {
            this.hTable.put(put);
        }
    }

如果我们运行在一个集群上使用一个连接,它将无法工作,因为无法将连接发送到每个节点(它不可序列化),并且我们无法创建连接对于 rdd 的每个元素。所以解决方案是使用saveAsNewAPIHadoopDataset,它可以为集群的每个节点创建一个连接,并将rdd的所有元素保存到hbase(或hdfs,取决于配置)。