如何使用 Spark 在 HBase 中使任务可序列化
How to make Task Serializable in HBase using Spark
我尝试使用 Spark 在 HBase 中写入数据,但出现异常 Exception in thread "main" org.apache.spark.SparkException: Task not serializable
。我试图使用以下代码片段在每个工作节点上打开连接:
val conf = HBaseConfiguration.create()
val tableName = args(1)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
val tableDesc = new HTableDescriptor(tableName)
val columnDesc = new HColumnDescriptor("cf".getBytes()).setBloomFilterType(BloomType.ROWCOL).setMaxVersions(5)
tableDesc.addFamily(columnDesc)
admin.createTable(tableDesc)
rddData.foreachPartition( part => {
val table = new HTable(conf, tableName)
part.foreach( elem => {
var put = new Put(Bytes.toBytes(elem._1))
put.add(Bytes.toBytes("cf"), Bytes.toBytes("col"), Bytes.toBytes(elem._2))
table.put(put)
})
table.flushCommits()
})
如何在使用 spark 在 HBase 上写入时使任务可序列化?
如果我没记错的话conf(hadoop配置实例)是不可序列化的。
以所有不可序列化的部分都在 foreachPartition 块中的方式编写代码(以便它在节点上执行)。这是我创建第二个 conf 等的示例。:
`
rddData.foreachPartition( part => {
val conf2 = HBaseConfiguration.create()
val tableName2 = args(1)
conf2.set(TableInputFormat.INPUT_TABLE, tableName2)
val table2 = new HTable(conf2, tableName2)
part.foreach( elem => {
var put = new Put(Bytes.toBytes(elem._1))
put.add(Bytes.toBytes("cf"), Bytes.toBytes("col"), Bytes.toBytes(elem._2))
table2.put(put)
})
table2.flushCommits()
})
`
我尝试使用 Spark 在 HBase 中写入数据,但出现异常 Exception in thread "main" org.apache.spark.SparkException: Task not serializable
。我试图使用以下代码片段在每个工作节点上打开连接:
val conf = HBaseConfiguration.create()
val tableName = args(1)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
val tableDesc = new HTableDescriptor(tableName)
val columnDesc = new HColumnDescriptor("cf".getBytes()).setBloomFilterType(BloomType.ROWCOL).setMaxVersions(5)
tableDesc.addFamily(columnDesc)
admin.createTable(tableDesc)
rddData.foreachPartition( part => {
val table = new HTable(conf, tableName)
part.foreach( elem => {
var put = new Put(Bytes.toBytes(elem._1))
put.add(Bytes.toBytes("cf"), Bytes.toBytes("col"), Bytes.toBytes(elem._2))
table.put(put)
})
table.flushCommits()
})
如何在使用 spark 在 HBase 上写入时使任务可序列化?
如果我没记错的话conf(hadoop配置实例)是不可序列化的。
以所有不可序列化的部分都在 foreachPartition 块中的方式编写代码(以便它在节点上执行)。这是我创建第二个 conf 等的示例。:
`
rddData.foreachPartition( part => {
val conf2 = HBaseConfiguration.create()
val tableName2 = args(1)
conf2.set(TableInputFormat.INPUT_TABLE, tableName2)
val table2 = new HTable(conf2, tableName2)
part.foreach( elem => {
var put = new Put(Bytes.toBytes(elem._1))
put.add(Bytes.toBytes("cf"), Bytes.toBytes("col"), Bytes.toBytes(elem._2))
table2.put(put)
})
table2.flushCommits()
})
`