java.io.NotSerializableException 使用 Spark Streaming 将 rdd 保存到 Hbase 时
java.io.NotSerializableException when save rdd to Hbase using Spark Streaming
java.io.NotSerializableException我用spark处理数据的时候很麻烦
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com")
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
mydata.foreachRDD( rdd => {
val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan])
.map(Scan.transformScanRestult _)
.filter(_.nonEmpty)
.map(_.get)
.map(Scan.convertForHbase _ )
json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration)
})
但是由于 java.io.NotSerializableException 而失败,随后是错误信息
17/10/16 18:56:50 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
所以我将我的代码更改如下
object mytest_config{
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
}
mydata.foreachRDD( rdd => {
val json_rdd = rdd.map(Json.parse _ )
.map(_.validate[Scan])
.map(Scan.transformScanRestult _)
.filter(_.nonEmpty)
.map(_.get)
.map(Scan.convertForHbase _ )
json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration)
})
这可行!
有人知道为什么这样做,官方推荐的方法是什么?
错误是由于
此 newAPIJobConfiguration
已在驱动程序中初始化
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
正在使用内部工人(foreach
)
json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration)
java.io.NotSerializableException我用spark处理数据的时候很麻烦
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com")
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
mydata.foreachRDD( rdd => {
val json_rdd = rdd.map(Json.parse _ ).map(_.validate[Scan])
.map(Scan.transformScanRestult _)
.filter(_.nonEmpty)
.map(_.get)
.map(Scan.convertForHbase _ )
json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration)
})
但是由于 java.io.NotSerializableException 而失败,随后是错误信息
17/10/16 18:56:50 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
所以我将我的代码更改如下
object mytest_config{
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
}
mydata.foreachRDD( rdd => {
val json_rdd = rdd.map(Json.parse _ )
.map(_.validate[Scan])
.map(Scan.transformScanRestult _)
.filter(_.nonEmpty)
.map(_.get)
.map(Scan.convertForHbase _ )
json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration)
})
这可行! 有人知道为什么这样做,官方推荐的方法是什么?
错误是由于
此 newAPIJobConfiguration
已在驱动程序中初始化
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
正在使用内部工人(foreach
)
json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration)