如何在 python 中使用自定义 Apache Avro 字段类型
How can I use a custom Apache Avro field type in python
我可以访问 Apache Kafka 集群,并且获得了一个描述消息的 Apache Avro 序列化格式的文件。我正在 python 中编写一个小型测试使用者,在尝试解析模式时出现以下错误:
SchemaParseException: Type property "{u'items': u'com.myapp.avromsg.common.MilestoneField', u'type': u'array'}" not a valid Avro schema: Items schema (com.myapp.avromsg.common.MilestoneField) not a valid Avro schema: Could not make an Avro Schema object from com.myapp.avromsg.common.MilestoneField. (known names: [u'com.myapp.avromsg.runstatus.RunStatusMessage'])
在我看来,错误是因为不知道自定义字段类型 MilestoneField。我将如何向我的脚本描述这个字段,以便序列化格式能够正确解析?
这是 my_msg.avsc
avro 文件:
{
"type": "record",
"name": "RunStatusMessage",
"namespace": "com.myapp.avromsg.runstatus",
"fields": [
{
"name": "datasetID",
"type": "string"
},
{
"name": "runID",
"type": ["string", "null"]
},
{
"name": "registryRunID",
"type": ["string", "null"]
},
{
"name": "status",
"type": "string"
},
{
"name": "logs",
"type": ["string", "null"]
},
{
"name": "jobID",
"type": ["string", "null"]
},
{
"name": "validationsJson",
"type": ["string", "null"]
},
{
"name": "zone",
"type": "string"
},
{
"name": "milestoneFields",
"type": {
"type": "array",
"items": "com.myapp.avromsg.common.MilestoneField"
}
},
{
"name": "ingestionParams",
"type": {
"type": "array",
"items": "com.myapp.avromsg.common.MilestoneField"
},
"default": []
},
{
"name": "timestamp",
"type": [
{
"type": "long",
"logicalType": "timestamp-millis"
},
{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 0
},
"string",
"int",
"null"
]
}
]
}
这是我目前使用的代码:
import avro.schema
schema = avro.schema.parse(open('my_msg.avsc', 'rb').read())
不知道如何用pyhon编码,但我可以提供java版本(我的期望应该差不多)。您有两种选择,包括将 MilestoneField
对象的定义作为模式的一部分(如果您在多个部分中使用它,则根本不干净)或向 Schema.Parser
添加额外的类型。在示例中,我对模式进行了硬编码,但想法与从 File
中读取的内容相同
public static void main(String [] args){
Schema.Parser parser = new Schema.Parser();
Schema pojo = new Schema.Parser().parse("{\n" +
" \"namespace\": \"io.fama.pubsub.schema\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"Pojo\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"field\",\n" +
" \"type\": \"string\"\n" +
" }\n" +
" ]\n" +
"}");
HashMap<String, Schema> extraTypes = new HashMap<>();
extraTypes.put("Pojo", pojo);
parser.addTypes(extraTypes);
Schema schema = parser.parse("{\n" +
" \"namespace\": \"io.fama.pubsub.schema\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"PojoCollection\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"pojosCollection\",\n" +
" \"type\": {\n" +
" \"type\": \"array\",\n" +
" \"items\": \"Pojo\"\n" +
" }\n" +
" }, {\n" +
" \"name\": \"additionaField\",\n" +
" \"type\": [\"null\", \"string\"]\n" +
" }\n" +
" ]\n" +
"}");
}
如您所见,您可以使用 addTypes 方法在您的架构中包含其他自定义对象。方法参数是 Map<string,Schema>
因此您需要先解析您的自定义对象模式。现在,如果您有 class 版本的模式(由 avro 生成),您应该可以像这样添加它
extraTypes.put("MilestoneField", MilestoneField.SCHEMA$);
假设我有 avsc
文件定义了我的自定义字段和我的消息模式,下面是我如何使用 python avro
来做到这一点
import avro.schema
import json
schema_list = []
# First add the custom field to the schema list
custom_json = json.loads(open('custom_field.avsc', 'rb').read())
schema_list.append(custom_json)
# Then add the main message schema
main _json = json.loads(open('main _msg.avsc', 'rb').read())
schema_list.append(main _json)
# Convert the schema json to a JSON string
schema_json = json.dumps(schema_list)
# Parse the schema
full_msg_schema = avro.schema.parse(schema_json)
我可以访问 Apache Kafka 集群,并且获得了一个描述消息的 Apache Avro 序列化格式的文件。我正在 python 中编写一个小型测试使用者,在尝试解析模式时出现以下错误:
SchemaParseException: Type property "{u'items': u'com.myapp.avromsg.common.MilestoneField', u'type': u'array'}" not a valid Avro schema: Items schema (com.myapp.avromsg.common.MilestoneField) not a valid Avro schema: Could not make an Avro Schema object from com.myapp.avromsg.common.MilestoneField. (known names: [u'com.myapp.avromsg.runstatus.RunStatusMessage'])
在我看来,错误是因为不知道自定义字段类型 MilestoneField。我将如何向我的脚本描述这个字段,以便序列化格式能够正确解析?
这是 my_msg.avsc
avro 文件:
{
"type": "record",
"name": "RunStatusMessage",
"namespace": "com.myapp.avromsg.runstatus",
"fields": [
{
"name": "datasetID",
"type": "string"
},
{
"name": "runID",
"type": ["string", "null"]
},
{
"name": "registryRunID",
"type": ["string", "null"]
},
{
"name": "status",
"type": "string"
},
{
"name": "logs",
"type": ["string", "null"]
},
{
"name": "jobID",
"type": ["string", "null"]
},
{
"name": "validationsJson",
"type": ["string", "null"]
},
{
"name": "zone",
"type": "string"
},
{
"name": "milestoneFields",
"type": {
"type": "array",
"items": "com.myapp.avromsg.common.MilestoneField"
}
},
{
"name": "ingestionParams",
"type": {
"type": "array",
"items": "com.myapp.avromsg.common.MilestoneField"
},
"default": []
},
{
"name": "timestamp",
"type": [
{
"type": "long",
"logicalType": "timestamp-millis"
},
{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 0
},
"string",
"int",
"null"
]
}
]
}
这是我目前使用的代码:
import avro.schema
schema = avro.schema.parse(open('my_msg.avsc', 'rb').read())
不知道如何用pyhon编码,但我可以提供java版本(我的期望应该差不多)。您有两种选择,包括将 MilestoneField
对象的定义作为模式的一部分(如果您在多个部分中使用它,则根本不干净)或向 Schema.Parser
添加额外的类型。在示例中,我对模式进行了硬编码,但想法与从 File
public static void main(String [] args){
Schema.Parser parser = new Schema.Parser();
Schema pojo = new Schema.Parser().parse("{\n" +
" \"namespace\": \"io.fama.pubsub.schema\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"Pojo\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"field\",\n" +
" \"type\": \"string\"\n" +
" }\n" +
" ]\n" +
"}");
HashMap<String, Schema> extraTypes = new HashMap<>();
extraTypes.put("Pojo", pojo);
parser.addTypes(extraTypes);
Schema schema = parser.parse("{\n" +
" \"namespace\": \"io.fama.pubsub.schema\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"PojoCollection\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"pojosCollection\",\n" +
" \"type\": {\n" +
" \"type\": \"array\",\n" +
" \"items\": \"Pojo\"\n" +
" }\n" +
" }, {\n" +
" \"name\": \"additionaField\",\n" +
" \"type\": [\"null\", \"string\"]\n" +
" }\n" +
" ]\n" +
"}");
}
如您所见,您可以使用 addTypes 方法在您的架构中包含其他自定义对象。方法参数是 Map<string,Schema>
因此您需要先解析您的自定义对象模式。现在,如果您有 class 版本的模式(由 avro 生成),您应该可以像这样添加它
extraTypes.put("MilestoneField", MilestoneField.SCHEMA$);
假设我有 avsc
文件定义了我的自定义字段和我的消息模式,下面是我如何使用 python avro
import avro.schema
import json
schema_list = []
# First add the custom field to the schema list
custom_json = json.loads(open('custom_field.avsc', 'rb').read())
schema_list.append(custom_json)
# Then add the main message schema
main _json = json.loads(open('main _msg.avsc', 'rb').read())
schema_list.append(main _json)
# Convert the schema json to a JSON string
schema_json = json.dumps(schema_list)
# Parse the schema
full_msg_schema = avro.schema.parse(schema_json)