使用 Python3 处理 AVRO 的嵌套模式
Handling nested schemas of AVRO with Python3
我正在使用 avro1.8.2 + python3.7 (pip install avro-python3
) 进行 AVRO 格式处理。
这是来自 AVRO website
的示例代码
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
schema = avro.schema.parse(open("user.avsc", "rb").read())
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
print user
reader.close()
此代码不起作用,因为 parse
方法已重命名为 Parse
,并且删除了支持嵌套架构所需的第二个参数。
所以问题是如何 read/write AVRO 在 python3 中使用嵌套模式?
在阅读了 Avro 库的源代码后,我想出了一个方法来做到这一点。这是代码
import json
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
def create_schema():
names = avro.schema.Names()
load = lambda dict_value: avro.schema.SchemaFromJSONData(dict_value, names=names)
transaction_schema_dict = {
"namespace": "myavro",
"type": "record",
"name": "Transaction",
"fields": [
{"name": "name", "type": "string"},
]
}
account_schema_dict = {
"namespace": "myavro",
"type": "record",
"name": "Account",
"fields": [
{"name": "name", "type": "string"},
{"name": "transaction", "type": ["null", {'type': 'array', 'items': 'Transaction'}], 'default': "null"},
]
}
load(transaction_schema_dict)
return load(account_schema_dict)
def write_avro_file(file_path, schema, data):
with open(file_path, 'wb') as f, DataFileWriter(f, DatumWriter(), schema) as writer:
writer.append(data)
def print_avro_file(file_path):
with open(file_path, 'rb') as f, DataFileReader(f, DatumReader()) as reader:
for account in reader:
print(account)
def run():
schema = create_schema()
file_path = 'account.avro'
data = {
'name': 'my account',
'transaction': [
{ 'name': 'my transaction 1' },
{ 'name': 'my transaction 2' },
]
}
write_avro_file(file_path, schema, data)
print_avro_file(file_path)
run()
关键是使用 SchemaFromJSONData
函数代替 Parse
,并分配相同的 Names
对象以允许模式相互引用。请注意,加载模式调用的顺序很重要。
我正在使用 avro1.8.2 + python3.7 (pip install avro-python3
) 进行 AVRO 格式处理。
这是来自 AVRO website
的示例代码import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
schema = avro.schema.parse(open("user.avsc", "rb").read())
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
print user
reader.close()
此代码不起作用,因为 parse
方法已重命名为 Parse
,并且删除了支持嵌套架构所需的第二个参数。
所以问题是如何 read/write AVRO 在 python3 中使用嵌套模式?
在阅读了 Avro 库的源代码后,我想出了一个方法来做到这一点。这是代码
import json
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
def create_schema():
names = avro.schema.Names()
load = lambda dict_value: avro.schema.SchemaFromJSONData(dict_value, names=names)
transaction_schema_dict = {
"namespace": "myavro",
"type": "record",
"name": "Transaction",
"fields": [
{"name": "name", "type": "string"},
]
}
account_schema_dict = {
"namespace": "myavro",
"type": "record",
"name": "Account",
"fields": [
{"name": "name", "type": "string"},
{"name": "transaction", "type": ["null", {'type': 'array', 'items': 'Transaction'}], 'default': "null"},
]
}
load(transaction_schema_dict)
return load(account_schema_dict)
def write_avro_file(file_path, schema, data):
with open(file_path, 'wb') as f, DataFileWriter(f, DatumWriter(), schema) as writer:
writer.append(data)
def print_avro_file(file_path):
with open(file_path, 'rb') as f, DataFileReader(f, DatumReader()) as reader:
for account in reader:
print(account)
def run():
schema = create_schema()
file_path = 'account.avro'
data = {
'name': 'my account',
'transaction': [
{ 'name': 'my transaction 1' },
{ 'name': 'my transaction 2' },
]
}
write_avro_file(file_path, schema, data)
print_avro_file(file_path)
run()
关键是使用 SchemaFromJSONData
函数代替 Parse
,并分配相同的 Names
对象以允许模式相互引用。请注意,加载模式调用的顺序很重要。