Avro:将 UNION 模式转换为 RECORD 模式

Avro: convert UNION schema to RECORD schema

我为简单的 class 层次结构自动生成了 Avro 模式:

trait T {def name: String}
case class A(name: String, value: Int) extends T
case class B(name: String, history: Array[String]) extends T

看起来像这样:

 [{
  "name": "org.example.schema.raw.A",
  "type": "record",
  "fields": [{
    "name": "name",
    "type": "string"
  }, {
    "name": "value",
    "type": "int"
  }]
}, {
  "name": "org.example.schema.raw.B",
  "type": "record",
  "fields": [{
    "name": "name",
    "type": "string"
  }, {
    "name": "history",
    "type": {
      "type": "array",
      "items": "string"
    }
  }]
}]

此架构适用于使用普通 Avro API 将数据从 JSON 读取到 GenericRecord。接下来我尝试实现的是使用 AvroParquetWriter:

将所有此类 GenericRecord 对象存储到单个镶木地板文件中
val writer = new AvroParquetWriter[GenericRecord](file, schema)
writer.write(record)
writer.close()

此代码在第一行失败

java.lang.IllegalArgumentException: Avro schema must be a record.
at parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:96)
at parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:137)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:86)

难怪,AvroSchemaConverter 包含以下行:

if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
      throw new IllegalArgumentException("Avro schema must be a record.");
}

我的架构类型是 UNION。非常感谢将此 UNION 模式映射(合并)到 RECORD 模式中的任何 ideas/help 或任何其他建议。

解决方案

1) 使用联合方案从输入读取 JSON 到 GenericRecord 2) 获取或创建 AvroParquetWriter 类型:

val writer = writers.getOrElseUpdate(record.getType, new AvroParquetWriter[GenericRecord](getPath(record.getType), record.getShema)

3) 将记录写入文件:

writer.write(record)

4) 当输入的所有数据都消耗完后关闭所有写入器:

writers.values.foreach(_.close())

5) 将目录中的数据加载到 Spark SQL DataFrame:

sqlContext.option("mergeSchema", "true").parquet("/tmp/data/")

6) 数据可以按原样处理或存储 - 它已经被 Spark 合并:

df.write.format("parquet").save("merged.parquet")

要回答有关合并的问题:您可以使用以下内容 case class Merged(name: String, value: Option[Int], history: Option[Array[String]]) 并使用其生成的架构来写入您的数据。 通常,如果您有一个同时兼容 A 和 B 的架构,那么它们都会正确写入。

或者,正如您所说,avro 不会让您将所有数据写入同一个文件,也许您可​​以按类型拆分输出,并为每种类型写入一个文件?我知道在我能想到的大多数用例中我可能会这样做,但也许它不适用于你。

你可以用一个案例 class 来包装你的特征,这是一个记录。

case class Reord[K](key: K, value: T)