zeppelin 用户 Spark 解释器将 DataFrame 保存到 Hbase Table
zeppelin user Spark interpreter save DataFrame to Hbase Table
我需要使用 Zeppelin 构建一个分析解决方案,从 Oracle、Hadoop、Hbase 读取数据,现在我需要通过 zeepelin spark 解释器将分析结果保存到 hbase,我写了一个演示代码,这个演示在 Spark 开发中工作环境,但它在 zeppelin 中不起作用。
谁能帮帮我,谢谢。
%spark
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
var sql="select * from analyticresult"
val sqlContext = sqlc
import sqlContext.implicits._
val list = sqlContext.sql(sql)
def getJob(tableName: String): Job = {
sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "dn1.hadoop,dn2.hadoop,dn3.hadoop,dn4.hadoop,dn5.hadoop")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
job
}
val hbaseRDD = list.mapPartitions( iter => {
iter.map( it => {
var rowkey = it.getAs[String](8).getBytes
var put = new Put(rowkey) //set rowkey
put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("CI"), it.getAs[String](3).getBytes)
put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("CT"), it.getAs[String](4).getBytes)
//Bytes.toBytes(it.getLong(9))
(new ImmutableBytesWritable, put)
})
})
var table = getJob("HWBTEST")
hbaseRDD.saveAsNewAPIHadoopDataset(table.getConfiguration)
zeppelin控制台:编译代码正常,但出现IllegalStateException
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@38a0571d
import sqlContext.implicits._
list: org.apache.spark.sql.DataFrame = [cardid: string, lineid: string, amount: string, time: string, busline_name: string, vin: string, carno: string, carcolor: string, gpstime: string, systime: string, status: string, alarm: string, lon: string, lat: string, height: string, speed: string, buslineId: string, stype: string, stationSeq: string, stationName: string, stationMark: string, onPersons: string, offPersons: string, buslineName: string, between_time: double, rn: int]
getJob: (tableName: String)org.apache.hadoop.mapreduce.Job
hbaseRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Put)] = MapPartitionsRDD[276] at mapPartitions at <console>:137
java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:292)
at org.apache.hadoop.mapreduce.Job.toString(Job.java:457)
at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner(ScalaRunTime.scala:324)
at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
at .<init>(<console>:10)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at sun.reflect.GeneratedMethodAccessor210.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:717)
at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:928)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:871)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:864)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:94)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:341)
at org.apache.zeppelin.scheduler.Job.run(Job.java:176)
at org.apache.zeppelin.scheduler.FIFOScheduler.run(FIFOScheduler.java:139)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
hbaseRDD.saveAsNewAPIHadoopDataset(getJob("HWBTEST").getConfiguration)
解决了问题
我需要使用 Zeppelin 构建一个分析解决方案,从 Oracle、Hadoop、Hbase 读取数据,现在我需要通过 zeepelin spark 解释器将分析结果保存到 hbase,我写了一个演示代码,这个演示在 Spark 开发中工作环境,但它在 zeppelin 中不起作用。
谁能帮帮我,谢谢。
%spark
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
var sql="select * from analyticresult"
val sqlContext = sqlc
import sqlContext.implicits._
val list = sqlContext.sql(sql)
def getJob(tableName: String): Job = {
sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "dn1.hadoop,dn2.hadoop,dn3.hadoop,dn4.hadoop,dn5.hadoop")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
job
}
val hbaseRDD = list.mapPartitions( iter => {
iter.map( it => {
var rowkey = it.getAs[String](8).getBytes
var put = new Put(rowkey) //set rowkey
put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("CI"), it.getAs[String](3).getBytes)
put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("CT"), it.getAs[String](4).getBytes)
//Bytes.toBytes(it.getLong(9))
(new ImmutableBytesWritable, put)
})
})
var table = getJob("HWBTEST")
hbaseRDD.saveAsNewAPIHadoopDataset(table.getConfiguration)
zeppelin控制台:编译代码正常,但出现IllegalStateException
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@38a0571d
import sqlContext.implicits._
list: org.apache.spark.sql.DataFrame = [cardid: string, lineid: string, amount: string, time: string, busline_name: string, vin: string, carno: string, carcolor: string, gpstime: string, systime: string, status: string, alarm: string, lon: string, lat: string, height: string, speed: string, buslineId: string, stype: string, stationSeq: string, stationName: string, stationMark: string, onPersons: string, offPersons: string, buslineName: string, between_time: double, rn: int]
getJob: (tableName: String)org.apache.hadoop.mapreduce.Job
hbaseRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Put)] = MapPartitionsRDD[276] at mapPartitions at <console>:137
java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:292)
at org.apache.hadoop.mapreduce.Job.toString(Job.java:457)
at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner(ScalaRunTime.scala:324)
at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
at .<init>(<console>:10)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at sun.reflect.GeneratedMethodAccessor210.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:717)
at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:928)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:871)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:864)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:94)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:341)
at org.apache.zeppelin.scheduler.Job.run(Job.java:176)
at org.apache.zeppelin.scheduler.FIFOScheduler.run(FIFOScheduler.java:139)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
hbaseRDD.saveAsNewAPIHadoopDataset(getJob("HWBTEST").getConfiguration)
解决了问题