使用 reader 架构将 Avro 文件转换为 JSON
Convert Avro file to JSON with reader schema
我想在命令行上反序列化 Avro 数据 使用不同于编写器模式 reader 的模式 。我可以在序列化时指定编写器架构,但在反序列化期间不能。
record.json
(数据文件):
{"test1": 1, "test2": 2}
writer.avsc
(作者模式):
{
"type": "record",
"name": "pouac",
"fields": [
{
"name": "test1",
"type": "int"
},
{
"name": "test2",
"type": "int"
}
]
}
reader.avsc
(reader 架构):
{
"type": "record",
"name": "pouac",
"fields": [{
"name": "test2",
"type": "int",
"aliases": ["test1"]
}]
}
序列化数据:
$ java -jar avro-tools-1.8.2.jar fromjson --schema-file writer.avsc record.json > record.avro
为了反序列化数据,我尝试了以下方法:
$ java -jar avro-tools-1.8.2.jar tojson --schema-file reader.avsc record.avro
Exception in thread "main" joptsimple.UnrecognizedOptionException: 'schema-file' is not a recognized option
...
我主要是在寻找命令行指令,因为我不太习惯编写 Java 代码,但我很乐意使用 Java 代码来自己编译。实际上,我感兴趣的是确切的反序列化结果。 (更基本的问题在 this conversation 中描述,在我打开以实现别名的 fastavro PR 中)
你可以 运行 java -jar avro-tools-1.8.2.jar tojson
查看帮助,它告诉你可以像这样使用这个命令:
java -jar avro-tools-1.8.2.jar tojson record.avro > tost.json
这将输出到文件:
{"test1":1,"test2":2}
您也可以使用 --pretty
参数调用它:
java -jar avro-tools-1.8.2.jar tojson --pretty record.avro > tost.json
输出会很漂亮:
{
"test1" : 1,
"test2" : 2
}
avro-tools tojson
目标仅用作将二进制编码的 Avro 文件转换为 JSON 的转储工具。架构始终伴随 Avro 文件中的记录,如下面 link 中所述。因此它不能被 avro-tools 覆盖。
http://avro.apache.org/docs/1.8.2/#compare
我不知道有什么独立的工具可以用来实现你想要的。我认为您需要进行一些编程才能达到预期的结果。 Avro 支持多种语言,包括 Python,但跨语言的功能并不统一。 Java在我的经验中是最先进的。例如 Python 无法在 DataFileReader 上指定 reader 模式,这将有助于实现您想要的:
https://github.com/apache/avro/blob/master/lang/py/src/avro/datafile.py#L224
您在 Python 中最接近的是以下内容;
import avro.schema as avsc
import avro.datafile as avdf
import avro.io as avio
reader_schema = avsc.parse(open("reader.avsc", "rb").read())
# need ability to inject reader schema as 3rd arg
with avdf.DataFileReader(open("record.avro", "rb"), avio.DatumReader()) as reader:
for record in reader:
print record
根据您概述的模式和数据。预期的行为应该是 undefined,因此发出 error.
可以使用以下 Java 代码验证此行为;
package ca.junctionbox.soavro;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidationStrategy;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import java.util.ArrayList;
public class Main {
public static final String V1 = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"pouac\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"test1\",\n" +
" \"type\": \"int\"\n" +
" },\n" +
" {\n" +
" \"name\": \"test2\",\n" +
" \"type\": \"int\"\n" +
" }\n" +
" ]\n" +
"}";
public static final String V2 = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"pouac\",\n" +
" \"fields\": [{\n" +
" \"name\": \"test2\",\n" +
" \"type\": \"int\",\n" +
" \"aliases\": [\"test1\"]\n" +
" }]\n" +
"}";
public static void main(final String[] args) {
final SchemaValidator sv = new SchemaValidatorBuilder()
.canBeReadStrategy()
.validateAll();
final Schema sv1 = new Schema.Parser().parse(V1);
final Schema sv2 = new Schema.Parser().parse(V2);
final ArrayList<Schema> existing = new ArrayList<>();
existing.add(sv1);
try {
sv.validate(sv2, existing);
System.out.println("Good to go!");
} catch (SchemaValidationException e) {
e.printStackTrace();
}
}
}
这会产生以下输出:
org.apache.avro.SchemaValidationException: Unable to read schema:
{
"type" : "record",
"name" : "pouac",
"fields" : [ {
"name" : "test2",
"type" : "int",
"aliases" : [ "test1" ]
} ]
}
using schema:
{
"type" : "record",
"name" : "pouac",
"fields" : [ {
"name" : "test1",
"type" : "int"
}, {
"name" : "test2",
"type" : "int"
} ]
}
at org.apache.avro.ValidateMutualRead.canRead(ValidateMutualRead.java:70)
at org.apache.avro.ValidateCanBeRead.validate(ValidateCanBeRead.java:39)
at org.apache.avro.ValidateAll.validate(ValidateAll.java:51)
at ca.junctionbox.soavro.Main.main(Main.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:294)
at java.lang.Thread.run(Thread.java:748)
别名通常用于模式演变中的向后兼容性,允许从 disparate/legacy 键映射到通用键名。鉴于您的编写器模式不会通过使用联合将 test1 和 test2 字段视为 "optional" 我看不到您想要这种转换的场景。如果您想 "drop" test1 字段,则可以通过将其从 v2 模式规范中排除来实现。任何可以应用 reader 方案的 reader 都会使用 v2 方案定义忽略 test1。
为了说明我所说的进化的意思;
v1 架构
{
"type": "record",
"name": "pouac",
"fields": [
{
"name": "test1",
"type": "int"
}]
}
v2 架构
{
"type": "record",
"name": "pouac",
"fields": [
{
"name": "test2",
"type": "int",
"aliases": ["test1"]
}]
}
您可能拥有 v1 格式的 TB 数据,并引入 v2 格式,它将 test1 字段重命名为 test2。别名将允许您对 v1 和 v2 数据执行 map-reduce 作业、Hive 查询等,而无需先主动重写所有旧的 v1 数据。请注意,这假设字段的类型和语义没有变化。
我想在命令行上反序列化 Avro 数据 使用不同于编写器模式 reader 的模式 。我可以在序列化时指定编写器架构,但在反序列化期间不能。
record.json
(数据文件):
{"test1": 1, "test2": 2}
writer.avsc
(作者模式):
{
"type": "record",
"name": "pouac",
"fields": [
{
"name": "test1",
"type": "int"
},
{
"name": "test2",
"type": "int"
}
]
}
reader.avsc
(reader 架构):
{
"type": "record",
"name": "pouac",
"fields": [{
"name": "test2",
"type": "int",
"aliases": ["test1"]
}]
}
序列化数据:
$ java -jar avro-tools-1.8.2.jar fromjson --schema-file writer.avsc record.json > record.avro
为了反序列化数据,我尝试了以下方法:
$ java -jar avro-tools-1.8.2.jar tojson --schema-file reader.avsc record.avro
Exception in thread "main" joptsimple.UnrecognizedOptionException: 'schema-file' is not a recognized option
...
我主要是在寻找命令行指令,因为我不太习惯编写 Java 代码,但我很乐意使用 Java 代码来自己编译。实际上,我感兴趣的是确切的反序列化结果。 (更基本的问题在 this conversation 中描述,在我打开以实现别名的 fastavro PR 中)
你可以 运行 java -jar avro-tools-1.8.2.jar tojson
查看帮助,它告诉你可以像这样使用这个命令:
java -jar avro-tools-1.8.2.jar tojson record.avro > tost.json
这将输出到文件:
{"test1":1,"test2":2}
您也可以使用 --pretty
参数调用它:
java -jar avro-tools-1.8.2.jar tojson --pretty record.avro > tost.json
输出会很漂亮:
{
"test1" : 1,
"test2" : 2
}
avro-tools tojson
目标仅用作将二进制编码的 Avro 文件转换为 JSON 的转储工具。架构始终伴随 Avro 文件中的记录,如下面 link 中所述。因此它不能被 avro-tools 覆盖。
http://avro.apache.org/docs/1.8.2/#compare
我不知道有什么独立的工具可以用来实现你想要的。我认为您需要进行一些编程才能达到预期的结果。 Avro 支持多种语言,包括 Python,但跨语言的功能并不统一。 Java在我的经验中是最先进的。例如 Python 无法在 DataFileReader 上指定 reader 模式,这将有助于实现您想要的:
https://github.com/apache/avro/blob/master/lang/py/src/avro/datafile.py#L224
您在 Python 中最接近的是以下内容;
import avro.schema as avsc
import avro.datafile as avdf
import avro.io as avio
reader_schema = avsc.parse(open("reader.avsc", "rb").read())
# need ability to inject reader schema as 3rd arg
with avdf.DataFileReader(open("record.avro", "rb"), avio.DatumReader()) as reader:
for record in reader:
print record
根据您概述的模式和数据。预期的行为应该是 undefined,因此发出 error.
可以使用以下 Java 代码验证此行为;
package ca.junctionbox.soavro;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidationStrategy;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import java.util.ArrayList;
public class Main {
public static final String V1 = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"pouac\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"test1\",\n" +
" \"type\": \"int\"\n" +
" },\n" +
" {\n" +
" \"name\": \"test2\",\n" +
" \"type\": \"int\"\n" +
" }\n" +
" ]\n" +
"}";
public static final String V2 = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"pouac\",\n" +
" \"fields\": [{\n" +
" \"name\": \"test2\",\n" +
" \"type\": \"int\",\n" +
" \"aliases\": [\"test1\"]\n" +
" }]\n" +
"}";
public static void main(final String[] args) {
final SchemaValidator sv = new SchemaValidatorBuilder()
.canBeReadStrategy()
.validateAll();
final Schema sv1 = new Schema.Parser().parse(V1);
final Schema sv2 = new Schema.Parser().parse(V2);
final ArrayList<Schema> existing = new ArrayList<>();
existing.add(sv1);
try {
sv.validate(sv2, existing);
System.out.println("Good to go!");
} catch (SchemaValidationException e) {
e.printStackTrace();
}
}
}
这会产生以下输出:
org.apache.avro.SchemaValidationException: Unable to read schema:
{
"type" : "record",
"name" : "pouac",
"fields" : [ {
"name" : "test2",
"type" : "int",
"aliases" : [ "test1" ]
} ]
}
using schema:
{
"type" : "record",
"name" : "pouac",
"fields" : [ {
"name" : "test1",
"type" : "int"
}, {
"name" : "test2",
"type" : "int"
} ]
}
at org.apache.avro.ValidateMutualRead.canRead(ValidateMutualRead.java:70)
at org.apache.avro.ValidateCanBeRead.validate(ValidateCanBeRead.java:39)
at org.apache.avro.ValidateAll.validate(ValidateAll.java:51)
at ca.junctionbox.soavro.Main.main(Main.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:294)
at java.lang.Thread.run(Thread.java:748)
别名通常用于模式演变中的向后兼容性,允许从 disparate/legacy 键映射到通用键名。鉴于您的编写器模式不会通过使用联合将 test1 和 test2 字段视为 "optional" 我看不到您想要这种转换的场景。如果您想 "drop" test1 字段,则可以通过将其从 v2 模式规范中排除来实现。任何可以应用 reader 方案的 reader 都会使用 v2 方案定义忽略 test1。
为了说明我所说的进化的意思;
v1 架构
{
"type": "record",
"name": "pouac",
"fields": [
{
"name": "test1",
"type": "int"
}]
}
v2 架构
{
"type": "record",
"name": "pouac",
"fields": [
{
"name": "test2",
"type": "int",
"aliases": ["test1"]
}]
}
您可能拥有 v1 格式的 TB 数据,并引入 v2 格式,它将 test1 字段重命名为 test2。别名将允许您对 v1 和 v2 数据执行 map-reduce 作业、Hive 查询等,而无需先主动重写所有旧的 v1 数据。请注意,这假设字段的类型和语义没有变化。