zeppelin 用户 Spark 解释器将 DataFrame 保存到 Hbase Table

zeppelin user Spark interpreter save DataFrame to Hbase Table

我需要使用 Zeppelin 构建一个分析解决方案,从 Oracle、Hadoop、Hbase 读取数据,现在我需要通过 zeepelin spark 解释器将分析结果保存到 hbase,我写了一个演示代码,这个演示在 Spark 开发中工作环境,但它在 zeppelin 中不起作用。

谁能帮帮我,谢谢。

 %spark
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job

var sql="select * from analyticresult"

val sqlContext = sqlc

import sqlContext.implicits._
val list = sqlContext.sql(sql)


def getJob(tableName: String): Job = {
    sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "dn1.hadoop,dn2.hadoop,dn3.hadoop,dn4.hadoop,dn5.hadoop")
    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    val job = Job.getInstance(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    job
  }

val hbaseRDD = list.mapPartitions( iter => {
      iter.map( it => {
        var rowkey = it.getAs[String](8).getBytes
        var put = new Put(rowkey) //set rowkey
        put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("CI"), it.getAs[String](3).getBytes)
        put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("CT"), it.getAs[String](4).getBytes) 
        //Bytes.toBytes(it.getLong(9))

        (new ImmutableBytesWritable, put)

      })
    })

var table = getJob("HWBTEST") 
hbaseRDD.saveAsNewAPIHadoopDataset(table.getConfiguration)

zeppelin控制台:编译代码正常,但出现IllegalStateException

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@38a0571d
import sqlContext.implicits._
list: org.apache.spark.sql.DataFrame = [cardid: string, lineid: string, amount: string, time: string, busline_name: string, vin: string, carno: string, carcolor: string, gpstime: string, systime: string, status: string, alarm: string, lon: string, lat: string, height: string, speed: string, buslineId: string, stype: string, stationSeq: string, stationName: string, stationMark: string, onPersons: string, offPersons: string, buslineName: string, between_time: double, rn: int]
getJob: (tableName: String)org.apache.hadoop.mapreduce.Job
hbaseRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Put)] = MapPartitionsRDD[276] at mapPartitions at <console>:137
java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
    at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:292)
    at org.apache.hadoop.mapreduce.Job.toString(Job.java:457)
    at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner(ScalaRunTime.scala:324)
    at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
    at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
    at .<init>(<console>:10)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at sun.reflect.GeneratedMethodAccessor210.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:717)
    at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:928)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:871)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:864)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:94)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:341)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:176)
    at org.apache.zeppelin.scheduler.FIFOScheduler.run(FIFOScheduler.java:139)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

hbaseRDD.saveAsNewAPIHadoopDataset(getJob("HWBTEST").getConfiguration)

解决了问题