如何使用 avro 在镶木地板文件架构中创建重复类型?
how to created REPEATED type in parquet file schema with avro?
我们正在创建数据流管道,我们将从 postgres 读取数据并将其写入 parquet 文件。 ParquetIO.Sink 允许您将 GenericRecord 的 PCollection 写入 Parquet 文件(来自此处 https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html)。但是 parquet 文件架构并不像我预期的那样
这是我的架构:
schema = new org.apache.avro.Schema.Parser().parse("{\n" +
" \"type\": \"record\",\n" +
" \"namespace\": \"com.example\",\n" +
" \"name\": \"Patterns\",\n" +
" \"fields\": [\n" +
" { \"name\": \"id\", \"type\": \"string\" },\n" +
" { \"name\": \"name\", \"type\": \"string\" },\n" +
" { \"name\": \"createdAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
" { \"name\": \"updatedAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
" { \"name\": \"steps\", \"type\": [\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"name\":\"json\"}}] },\n" +
" ]\n" +
"}");
到目前为止,这是我的代码:
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply(JdbcIO.<GenericRecord> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", "jdbc:postgresql://localhost:port/database")
.withUsername("username")
.withPassword("password"))
.withQuery("select * from table limit(10)")
.withCoder(AvroCoder.of(schema))
.withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> {
GenericRecord record = new GenericData.Record(schema);
ResultSetMetaData metadata = resultSet.getMetaData();
int columnsNumber = metadata.getColumnCount();
for(int i=0; i<columnsNumber; i++) {
Object columnValue = resultSet.getObject(i+1);
if(columnValue instanceof UUID) columnValue=columnValue.toString();
if(columnValue instanceof Timestamp) columnValue=columnValue.toString();
if(columnValue instanceof PgArray) {
Object[] array = (Object[]) ((PgArray) columnValue).getArray();
List list=new ArrayList();
for (Object d : array) {
if(d instanceof PGobject) {
list.add(((PGobject) d).getValue());
}
}
columnValue = list;
}
record.put(i, columnValue);
}
return record;
}))
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema).withCompressionCodec(CompressionCodecName.SNAPPY))
.to("something.parquet")
);
p.run();
这是我得到的:
message com.example.table {
required binary id (UTF8);
required binary name (UTF8);
required binary createdAt (UTF8);
required binary updatedAt (UTF8);
optional group someArray (LIST) {
repeated binary array (UTF8);
}
}
这是我所期望的:
message com.example.table {
required binary id (UTF8);
required binary name (UTF8);
required binary createdAt (UTF8);
required binary updatedAt (UTF8);
optional repeated binary someArray(UTF8);
}
请帮忙
它是您用来描述预期模式的 protobuf 消息吗?我认为您得到的是从指定的 JSON 模式正确生成的。 optional repeated
在 protobuf 语言规范中没有意义:https://developers.google.com/protocol-buffers/docs/reference/proto2-spec
您可以删除 null
和方括号以生成简单的 repeated
字段,它在语义上等同于 optional repeated
(因为 repeated
表示零次或多次)。
我没有找到从 Avro 创建不在 GroupType 中的重复元素的方法。
Beam 中的 ParquetIO 使用 parquet-mr
项目中定义的 "standard" avro 转换,已实现 here。
似乎有两种方法可以将 Avro ARRAY 字段转换为 Parquet 消息 -- 但是 它们都不能创建您要查找的内容。
目前,avro 转换是 唯一 与 ParquetIO 交互的方式。我看到这个 JIRA Use Beam schema in ParquetIO 将其扩展到 Beam Rows,这可能允许不同的 parquet 消息策略。
或者,您可以为 ParquetIO 创建 JIRA 功能请求以支持 thrift 结构,这应该允许更好地控制 parquet 结构。
我们正在创建数据流管道,我们将从 postgres 读取数据并将其写入 parquet 文件。 ParquetIO.Sink 允许您将 GenericRecord 的 PCollection 写入 Parquet 文件(来自此处 https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html)。但是 parquet 文件架构并不像我预期的那样
这是我的架构:
schema = new org.apache.avro.Schema.Parser().parse("{\n" +
" \"type\": \"record\",\n" +
" \"namespace\": \"com.example\",\n" +
" \"name\": \"Patterns\",\n" +
" \"fields\": [\n" +
" { \"name\": \"id\", \"type\": \"string\" },\n" +
" { \"name\": \"name\", \"type\": \"string\" },\n" +
" { \"name\": \"createdAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
" { \"name\": \"updatedAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
" { \"name\": \"steps\", \"type\": [\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"name\":\"json\"}}] },\n" +
" ]\n" +
"}");
到目前为止,这是我的代码:
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply(JdbcIO.<GenericRecord> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", "jdbc:postgresql://localhost:port/database")
.withUsername("username")
.withPassword("password"))
.withQuery("select * from table limit(10)")
.withCoder(AvroCoder.of(schema))
.withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> {
GenericRecord record = new GenericData.Record(schema);
ResultSetMetaData metadata = resultSet.getMetaData();
int columnsNumber = metadata.getColumnCount();
for(int i=0; i<columnsNumber; i++) {
Object columnValue = resultSet.getObject(i+1);
if(columnValue instanceof UUID) columnValue=columnValue.toString();
if(columnValue instanceof Timestamp) columnValue=columnValue.toString();
if(columnValue instanceof PgArray) {
Object[] array = (Object[]) ((PgArray) columnValue).getArray();
List list=new ArrayList();
for (Object d : array) {
if(d instanceof PGobject) {
list.add(((PGobject) d).getValue());
}
}
columnValue = list;
}
record.put(i, columnValue);
}
return record;
}))
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema).withCompressionCodec(CompressionCodecName.SNAPPY))
.to("something.parquet")
);
p.run();
这是我得到的:
message com.example.table {
required binary id (UTF8);
required binary name (UTF8);
required binary createdAt (UTF8);
required binary updatedAt (UTF8);
optional group someArray (LIST) {
repeated binary array (UTF8);
}
}
这是我所期望的:
message com.example.table {
required binary id (UTF8);
required binary name (UTF8);
required binary createdAt (UTF8);
required binary updatedAt (UTF8);
optional repeated binary someArray(UTF8);
}
请帮忙
它是您用来描述预期模式的 protobuf 消息吗?我认为您得到的是从指定的 JSON 模式正确生成的。 optional repeated
在 protobuf 语言规范中没有意义:https://developers.google.com/protocol-buffers/docs/reference/proto2-spec
您可以删除 null
和方括号以生成简单的 repeated
字段,它在语义上等同于 optional repeated
(因为 repeated
表示零次或多次)。
我没有找到从 Avro 创建不在 GroupType 中的重复元素的方法。
Beam 中的 ParquetIO 使用 parquet-mr
项目中定义的 "standard" avro 转换,已实现 here。
似乎有两种方法可以将 Avro ARRAY 字段转换为 Parquet 消息 -- 但是 它们都不能创建您要查找的内容。
目前,avro 转换是 唯一 与 ParquetIO 交互的方式。我看到这个 JIRA Use Beam schema in ParquetIO 将其扩展到 Beam Rows,这可能允许不同的 parquet 消息策略。
或者,您可以为 ParquetIO 创建 JIRA 功能请求以支持 thrift 结构,这应该允许更好地控制 parquet 结构。