Avro:将 UNION 模式转换为 RECORD 模式
Avro: convert UNION schema to RECORD schema
我为简单的 class 层次结构自动生成了 Avro 模式:
trait T {def name: String}
case class A(name: String, value: Int) extends T
case class B(name: String, history: Array[String]) extends T
看起来像这样:
[{
"name": "org.example.schema.raw.A",
"type": "record",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "value",
"type": "int"
}]
}, {
"name": "org.example.schema.raw.B",
"type": "record",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "history",
"type": {
"type": "array",
"items": "string"
}
}]
}]
此架构适用于使用普通 Avro API 将数据从 JSON 读取到 GenericRecord
。接下来我尝试实现的是使用 AvroParquetWriter
:
将所有此类 GenericRecord
对象存储到单个镶木地板文件中
val writer = new AvroParquetWriter[GenericRecord](file, schema)
writer.write(record)
writer.close()
此代码在第一行失败
java.lang.IllegalArgumentException: Avro schema must be a record.
at parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:96)
at parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:137)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:86)
难怪,AvroSchemaConverter 包含以下行:
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
我的架构类型是 UNION。非常感谢将此 UNION 模式映射(合并)到 RECORD 模式中的任何 ideas/help 或任何其他建议。
解决方案
1) 使用联合方案从输入读取 JSON 到 GenericRecord
2) 获取或创建 AvroParquetWriter
类型:
val writer = writers.getOrElseUpdate(record.getType, new AvroParquetWriter[GenericRecord](getPath(record.getType), record.getShema)
3) 将记录写入文件:
writer.write(record)
4) 当输入的所有数据都消耗完后关闭所有写入器:
writers.values.foreach(_.close())
5) 将目录中的数据加载到 Spark SQL DataFrame:
sqlContext.option("mergeSchema", "true").parquet("/tmp/data/")
6) 数据可以按原样处理或存储 - 它已经被 Spark 合并:
df.write.format("parquet").save("merged.parquet")
要回答有关合并的问题:您可以使用以下内容 case class Merged(name: String, value: Option[Int], history: Option[Array[String]])
并使用其生成的架构来写入您的数据。
通常,如果您有一个同时兼容 A 和 B 的架构,那么它们都会正确写入。
或者,正如您所说,avro 不会让您将所有数据写入同一个文件,也许您可以按类型拆分输出,并为每种类型写入一个文件?我知道在我能想到的大多数用例中我可能会这样做,但也许它不适用于你。
你可以用一个案例 class 来包装你的特征,这是一个记录。
case class Reord[K](key: K, value: T)
我为简单的 class 层次结构自动生成了 Avro 模式:
trait T {def name: String}
case class A(name: String, value: Int) extends T
case class B(name: String, history: Array[String]) extends T
看起来像这样:
[{
"name": "org.example.schema.raw.A",
"type": "record",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "value",
"type": "int"
}]
}, {
"name": "org.example.schema.raw.B",
"type": "record",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "history",
"type": {
"type": "array",
"items": "string"
}
}]
}]
此架构适用于使用普通 Avro API 将数据从 JSON 读取到 GenericRecord
。接下来我尝试实现的是使用 AvroParquetWriter
:
GenericRecord
对象存储到单个镶木地板文件中
val writer = new AvroParquetWriter[GenericRecord](file, schema)
writer.write(record)
writer.close()
此代码在第一行失败
java.lang.IllegalArgumentException: Avro schema must be a record.
at parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:96)
at parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:137)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:86)
难怪,AvroSchemaConverter 包含以下行:
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
我的架构类型是 UNION。非常感谢将此 UNION 模式映射(合并)到 RECORD 模式中的任何 ideas/help 或任何其他建议。
解决方案
1) 使用联合方案从输入读取 JSON 到 GenericRecord
2) 获取或创建 AvroParquetWriter
类型:
val writer = writers.getOrElseUpdate(record.getType, new AvroParquetWriter[GenericRecord](getPath(record.getType), record.getShema)
3) 将记录写入文件:
writer.write(record)
4) 当输入的所有数据都消耗完后关闭所有写入器:
writers.values.foreach(_.close())
5) 将目录中的数据加载到 Spark SQL DataFrame:
sqlContext.option("mergeSchema", "true").parquet("/tmp/data/")
6) 数据可以按原样处理或存储 - 它已经被 Spark 合并:
df.write.format("parquet").save("merged.parquet")
要回答有关合并的问题:您可以使用以下内容 case class Merged(name: String, value: Option[Int], history: Option[Array[String]])
并使用其生成的架构来写入您的数据。
通常,如果您有一个同时兼容 A 和 B 的架构,那么它们都会正确写入。
或者,正如您所说,avro 不会让您将所有数据写入同一个文件,也许您可以按类型拆分输出,并为每种类型写入一个文件?我知道在我能想到的大多数用例中我可能会这样做,但也许它不适用于你。
你可以用一个案例 class 来包装你的特征,这是一个记录。
case class Reord[K](key: K, value: T)