CSV 到 AVRO 使用 python
CSV to AVRO using python
我有以下 csv:
field1;field2;field3;field4;field5;field6;field7;field8;field9;field10;field11;field12;
eu;4523;35353;01/09/1999; 741 ; 386 ; 412 ; 86 ; 1.624 ; 1.038 ; 469 ; 117 ;
我想把它转换成 avro。我创建了以下 avro 模式:
{"namespace": "forecast.avro",
"type": "record",
"name": "forecast",
"fields": [
{"name": "field1", "type": "string"},
{"name": "field2", "type": "string"},
{"name": "field3", "type": "string"},
{"name": "field4", "type": "string"},
{"name": "field5", "type": "string"},
{"name": "field6", "type": "string"},
{"name": "field7", "type": "string"},
{"name": "field8", "type": "string"},
{"name": "field9", "type": "string"},
{"name": "field10", "type": "string"},
{"name": "field11", "type": "string"},
{"name": "field12", "type": "null"}
]
}
我的代码是下一个:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import csv
from collections import namedtuple
FORECAST = "forecast.csv"
fields = ("field1", "field2", "field3", "field4", "field5", "field6", "field7", "field8", "field9", "field10", "field11", "field12")
forecastRecord = namedtuple('forecastRecord', fields)
def read_forecast_data(path):
with open(path, 'rU') as data:
data.readline()
reader = csv.reader(data, delimiter = ";")
for row in map(forecastRecord._make, reader):
print(row)
yield row
if __name__=="__main__":
for row in read_forecast_data(FORECAST):
print (row)
break
def parse_schema(path="forecast.avsc"):
with open(path, 'r') as data:
return avro.schema.parse(data.read())
def serialize_records(records, outpath="forecast.avro"):
schema = parse_schema()
with open(outpath, 'w') as out:
writer = DataFileWriter(out, DatumWriter(), schema)
for record in records:
record = dict((f, getattr(record, f)) for f in record._fields)
writer.append(record)
if __name__ == "__main__":
serialize_records(read_forecast_data(FORECAST))
当我 运行 代码时,我收到错误消息,指出数据不是当前模式的示例。我一次又一次地检查我的架构以发现任何不一致之处,但直到现在我还没有找到任何不一致之处。有人可以帮我吗?
当我 运行 您编写的代码时,我在 for row in map(forecastRecord._make, reader):
处收到错误 TypeError: Expected 12 arguments, got 13
,因为您的 CSV 以 ;
结尾,因此有 13 个字段。
删除尾随的 ;
后,我可以 运行 该示例并得到关于架构不匹配的相同错误。原因是您的架构中的 field12
被定义为 null
的类型,但在数据中它是 string
类型(值为 "117"
)。
如果您将 avsc 文件更改为 {"name": "field12", "type": "string"}
那么它就可以工作了。
我有以下 csv:
field1;field2;field3;field4;field5;field6;field7;field8;field9;field10;field11;field12;
eu;4523;35353;01/09/1999; 741 ; 386 ; 412 ; 86 ; 1.624 ; 1.038 ; 469 ; 117 ;
我想把它转换成 avro。我创建了以下 avro 模式:
{"namespace": "forecast.avro",
"type": "record",
"name": "forecast",
"fields": [
{"name": "field1", "type": "string"},
{"name": "field2", "type": "string"},
{"name": "field3", "type": "string"},
{"name": "field4", "type": "string"},
{"name": "field5", "type": "string"},
{"name": "field6", "type": "string"},
{"name": "field7", "type": "string"},
{"name": "field8", "type": "string"},
{"name": "field9", "type": "string"},
{"name": "field10", "type": "string"},
{"name": "field11", "type": "string"},
{"name": "field12", "type": "null"}
]
}
我的代码是下一个:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import csv
from collections import namedtuple
FORECAST = "forecast.csv"
fields = ("field1", "field2", "field3", "field4", "field5", "field6", "field7", "field8", "field9", "field10", "field11", "field12")
forecastRecord = namedtuple('forecastRecord', fields)
def read_forecast_data(path):
with open(path, 'rU') as data:
data.readline()
reader = csv.reader(data, delimiter = ";")
for row in map(forecastRecord._make, reader):
print(row)
yield row
if __name__=="__main__":
for row in read_forecast_data(FORECAST):
print (row)
break
def parse_schema(path="forecast.avsc"):
with open(path, 'r') as data:
return avro.schema.parse(data.read())
def serialize_records(records, outpath="forecast.avro"):
schema = parse_schema()
with open(outpath, 'w') as out:
writer = DataFileWriter(out, DatumWriter(), schema)
for record in records:
record = dict((f, getattr(record, f)) for f in record._fields)
writer.append(record)
if __name__ == "__main__":
serialize_records(read_forecast_data(FORECAST))
当我 运行 代码时,我收到错误消息,指出数据不是当前模式的示例。我一次又一次地检查我的架构以发现任何不一致之处,但直到现在我还没有找到任何不一致之处。有人可以帮我吗?
当我 运行 您编写的代码时,我在 for row in map(forecastRecord._make, reader):
处收到错误 TypeError: Expected 12 arguments, got 13
,因为您的 CSV 以 ;
结尾,因此有 13 个字段。
删除尾随的 ;
后,我可以 运行 该示例并得到关于架构不匹配的相同错误。原因是您的架构中的 field12
被定义为 null
的类型,但在数据中它是 string
类型(值为 "117"
)。
如果您将 avsc 文件更改为 {"name": "field12", "type": "string"}
那么它就可以工作了。