来自 CSV 的 Avro Python - avro.io.AvroTypeException:数据不是架构的示例
Avro Python from CSV - avro.io.AvroTypeException: The datum is not an example of the schema
我是 Avro 的新手。我正在尝试解析一个包含一个字符串值和一个 int 值的简单 CSV 文件,但出现错误:avro.io.AvroTypeException: The datum is not an example of the schema[=26] =]
我使用的架构是:
{"namespace": "paymenttransaction",
"type": "record",
"name": "Payment",
"fields": [
{"name": "TransactionId", "type": "string"},
{"name": "Id", "type": "int"}
]
}
CSV 文件包含以下内容:
TransactionId,Id
2018040101000222749,1
而我运行制作人的Python代码是:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv
value_schema = avro.load('/home/daniela/avro/example.avsc')
AvroProducerConf = {'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081',
}
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open('/home/usertest/avro/data/paymenttransactions.csv') as file:
reader = csv.DictReader(file, delimiter=",")
for row in reader:
avroProducer.produce(topic='test', value=row)
print(row)
avroProducer.flush()
我做错了什么?
这是因为 Id 仍然是一个字符串,而 schema 需要一个 int。
试试 :
with open('/home/usertest/avro/data/paymenttransactions.csv') as file:
reader = csv.DictReader(file, delimiter=",")
for row in reader:
data_set = {"TransactionId": row["TransactionId"], "Id": int(row["Id"])}
avroProducer.produce(topic='test', value=data_set)
print(row)
avroProducer.flush()
我是 Avro 的新手。我正在尝试解析一个包含一个字符串值和一个 int 值的简单 CSV 文件,但出现错误:avro.io.AvroTypeException: The datum is not an example of the schema[=26] =]
我使用的架构是:
{"namespace": "paymenttransaction",
"type": "record",
"name": "Payment",
"fields": [
{"name": "TransactionId", "type": "string"},
{"name": "Id", "type": "int"}
]
}
CSV 文件包含以下内容:
TransactionId,Id
2018040101000222749,1
而我运行制作人的Python代码是:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv
value_schema = avro.load('/home/daniela/avro/example.avsc')
AvroProducerConf = {'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081',
}
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open('/home/usertest/avro/data/paymenttransactions.csv') as file:
reader = csv.DictReader(file, delimiter=",")
for row in reader:
avroProducer.produce(topic='test', value=row)
print(row)
avroProducer.flush()
我做错了什么?
这是因为 Id 仍然是一个字符串,而 schema 需要一个 int。
试试 :
with open('/home/usertest/avro/data/paymenttransactions.csv') as file:
reader = csv.DictReader(file, delimiter=",")
for row in reader:
data_set = {"TransactionId": row["TransactionId"], "Id": int(row["Id"])}
avroProducer.produce(topic='test', value=data_set)
print(row)
avroProducer.flush()