如何使用 Spark 在 HBase 中使任务可序列化

How to make Task Serializable in HBase using Spark

我尝试使用 Spark 在 HBase 中写入数据,但出现异常 Exception in thread "main" org.apache.spark.SparkException: Task not serializable。我试图使用以下代码片段在每个工作节点上打开连接:

 val conf = HBaseConfiguration.create()
 val tableName = args(1)
 conf.set(TableInputFormat.INPUT_TABLE, tableName)
 val admin = new HBaseAdmin(conf)
 val tableDesc = new HTableDescriptor(tableName)
 val columnDesc = new HColumnDescriptor("cf".getBytes()).setBloomFilterType(BloomType.ROWCOL).setMaxVersions(5)
 tableDesc.addFamily(columnDesc)
 admin.createTable(tableDesc)

 rddData.foreachPartition( part => {
    val table = new HTable(conf, tableName)
    part.foreach( elem => {
      var put = new Put(Bytes.toBytes(elem._1))
      put.add(Bytes.toBytes("cf"), Bytes.toBytes("col"), Bytes.toBytes(elem._2))
      table.put(put)
   })
   table.flushCommits()
 })

如何在使用 spark 在 HBase 上写入时使任务可序列化?

如果我没记错的话conf(hadoop配置实例)是不可序列化的。

以所有不可序列化的部分都在 foreachPartition 块中的方式编写代码(以便它在节点上执行)。这是我创建第二个 conf 等的示例。:

`

rddData.foreachPartition( part => {
     val conf2 = HBaseConfiguration.create()
     val tableName2 = args(1)
     conf2.set(TableInputFormat.INPUT_TABLE, tableName2)
     val table2 = new HTable(conf2, tableName2)
     part.foreach( elem => {
      var put = new Put(Bytes.toBytes(elem._1))
      put.add(Bytes.toBytes("cf"), Bytes.toBytes("col"), Bytes.toBytes(elem._2))
      table2.put(put)
   })
   table2.flushCommits()
 })

`