读取保存在 HBase 列中的 AVRO 结构
Read AVRO structures saved in HBase columns
我是 Spark 和 HBase 的新手。我正在处理 HBase table 的备份。这些备份位于 S3 存储桶中。我正在使用 newAPIHadoopFile 通过 spark(scala) 阅读它们,如下所示:
conf.set("io.serializations", "org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.hbase.mapreduce.ResultSerialization")
val data = sc.newAPIHadoopFile(path,classOf[SequenceFileInputFormat[ImmutableBytesWritable, Result]], classOf[ImmutableBytesWritable], classOf[Result], conf)
有问题的 table 称为 Emps。 Emps 的模式是:
key: empid {COMPRESSION => 'gz' }
family: data
dob - date of birth of this employee.
e_info - avro structure for storing emp info.
e_dept- avro structure for storing info about dept.
family: extra - Extra Metadata {NAME => 'extra', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'SNAPPY', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
e_region - emp region
e_status - some data about his achievements
.
.
some more meta data
table 中有些列包含简单的字符串数据,有些列包含 AVRO 结构。
我正在尝试直接从 S3 中的 HBase 备份文件中读取这些数据。我不想在本地计算机中重新创建此 HBase table,因为 table 非常非常大。
这就是我试图阅读的方式:
data.keys.map{k=>(new String(k.get()))}.take(1)
res1: Array[String] = Array(111111111100011010102462)
data.values.map{ v =>{ for(cell <- v.rawCells()) yield{
val family = CellUtil.cloneFamily(cell);
val column = CellUtil.cloneQualifier(cell);
val value = CellUtil.cloneValue(cell);
new String(family) +"->"+ new String(column)+ "->"+ new String(value)
}
}
}.take(1)
res2: Array[Array[String]] = Array(Array(info->dob->01/01/1996, info->e_info->?ж�?�ո� ?�� ???̶�?�ո� ?�� ????, info->e_dept->?ж�??�ո� ?̶�??�ո� �ո� ??, extra->e_region-> CA, extra->e_status->, .....))
不出所料,我可以正确看到简单的字符串数据,但 AVRO 数据是垃圾。
我尝试使用 GenericDatumReader:
读取 AVRO 结构
data.values.map{ v =>{ for(cell <- v.rawCells()) yield{
val family = new String(CellUtil.cloneFamily(cell));
val column = new String(CellUtil.cloneQualifier(cell));
val value = CellUtil.cloneValue(cell);
if(column=="e_info"){
var schema_obj = new Schema.Parser
//schema_e_info contains the AVRO schema for e_info
var schema = schema_obj.parse(schema_e_info)
var READER2 = new GenericDatumReader[GenericRecord](schema)
var datum= READER2.read(null, DecoderFactory.defaultFactory.createBinaryDecoder(value,null))
var result=datum.get("type").toString()
family +"->"+column+ "->"+ new String(result) + "\n"
}
else
family +"->"+column+ "->"+ new String(value)+"\n"
}
}
}
但这给了我以下错误:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
... 74 elided
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
Serialization stack:
- object not serializable (class: org.apache.avro.Schema$RecordSchema, value: .....
所以我想问一下:
- 有没有办法让不可序列化的 class RecordSchema 与 map 函数一起工作?
- 到目前为止我的做法对吗?我很高兴知道处理此类数据的更好方法。
- 我读到在 Dataframe 中处理这个会容易得多。我试图将如此形成的 Hadoop RDD 转换为 Dataframe,但我又是 运行 盲目地在那里。
如异常所述 - 模式是不可序列化的。你能在映射器函数中初始化它吗?这样它就不需要从驱动程序发送到执行程序。
或者,您也可以创建一个包含模式的 Scala 单例对象。您会在每个执行程序上初始化一个 Scala 单例,因此当您从单例访问任何成员时,它不需要序列化并通过网络发送。这避免了为数据中的每一行重新创建架构的不必要开销。
只是为了检查您是否可以正常读取数据 - 您还可以在执行程序上将其转换为字节数组,在驱动程序中收集它并在驱动程序中进行反序列化(解析 AVRO 数据)代码。但这显然不会扩展,只是为了确保您的数据看起来不错,并在您编写原型代码以提取数据时避免与 spark 相关的并发症。
我是 Spark 和 HBase 的新手。我正在处理 HBase table 的备份。这些备份位于 S3 存储桶中。我正在使用 newAPIHadoopFile 通过 spark(scala) 阅读它们,如下所示:
conf.set("io.serializations", "org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.hbase.mapreduce.ResultSerialization")
val data = sc.newAPIHadoopFile(path,classOf[SequenceFileInputFormat[ImmutableBytesWritable, Result]], classOf[ImmutableBytesWritable], classOf[Result], conf)
有问题的 table 称为 Emps。 Emps 的模式是:
key: empid {COMPRESSION => 'gz' }
family: data
dob - date of birth of this employee.
e_info - avro structure for storing emp info.
e_dept- avro structure for storing info about dept.
family: extra - Extra Metadata {NAME => 'extra', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'SNAPPY', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
e_region - emp region
e_status - some data about his achievements
.
.
some more meta data
table 中有些列包含简单的字符串数据,有些列包含 AVRO 结构。
我正在尝试直接从 S3 中的 HBase 备份文件中读取这些数据。我不想在本地计算机中重新创建此 HBase table,因为 table 非常非常大。
这就是我试图阅读的方式:
data.keys.map{k=>(new String(k.get()))}.take(1)
res1: Array[String] = Array(111111111100011010102462)
data.values.map{ v =>{ for(cell <- v.rawCells()) yield{
val family = CellUtil.cloneFamily(cell);
val column = CellUtil.cloneQualifier(cell);
val value = CellUtil.cloneValue(cell);
new String(family) +"->"+ new String(column)+ "->"+ new String(value)
}
}
}.take(1)
res2: Array[Array[String]] = Array(Array(info->dob->01/01/1996, info->e_info->?ж�?�ո� ?�� ???̶�?�ո� ?�� ????, info->e_dept->?ж�??�ո� ?̶�??�ո� �ո� ??, extra->e_region-> CA, extra->e_status->, .....))
不出所料,我可以正确看到简单的字符串数据,但 AVRO 数据是垃圾。
我尝试使用 GenericDatumReader:
读取 AVRO 结构data.values.map{ v =>{ for(cell <- v.rawCells()) yield{
val family = new String(CellUtil.cloneFamily(cell));
val column = new String(CellUtil.cloneQualifier(cell));
val value = CellUtil.cloneValue(cell);
if(column=="e_info"){
var schema_obj = new Schema.Parser
//schema_e_info contains the AVRO schema for e_info
var schema = schema_obj.parse(schema_e_info)
var READER2 = new GenericDatumReader[GenericRecord](schema)
var datum= READER2.read(null, DecoderFactory.defaultFactory.createBinaryDecoder(value,null))
var result=datum.get("type").toString()
family +"->"+column+ "->"+ new String(result) + "\n"
}
else
family +"->"+column+ "->"+ new String(value)+"\n"
}
}
}
但这给了我以下错误:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
... 74 elided
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
Serialization stack:
- object not serializable (class: org.apache.avro.Schema$RecordSchema, value: .....
所以我想问一下:
- 有没有办法让不可序列化的 class RecordSchema 与 map 函数一起工作?
- 到目前为止我的做法对吗?我很高兴知道处理此类数据的更好方法。
- 我读到在 Dataframe 中处理这个会容易得多。我试图将如此形成的 Hadoop RDD 转换为 Dataframe,但我又是 运行 盲目地在那里。
如异常所述 - 模式是不可序列化的。你能在映射器函数中初始化它吗?这样它就不需要从驱动程序发送到执行程序。
或者,您也可以创建一个包含模式的 Scala 单例对象。您会在每个执行程序上初始化一个 Scala 单例,因此当您从单例访问任何成员时,它不需要序列化并通过网络发送。这避免了为数据中的每一行重新创建架构的不必要开销。
只是为了检查您是否可以正常读取数据 - 您还可以在执行程序上将其转换为字节数组,在驱动程序中收集它并在驱动程序中进行反序列化(解析 AVRO 数据)代码。但这显然不会扩展,只是为了确保您的数据看起来不错,并在您编写原型代码以提取数据时避免与 spark 相关的并发症。