Azure 中的 CSV 到 AVRO 转换

CSV to AVRO conversion in Azure

我正在尝试使用创建的方案将存储在 azure data lake store 中的 csv 文件转换为 avro 文件。是否有任何具有相同目的的示例源代码?

您可以为此使用 Azure Data Lake Analytics。 https://github.com/Azure/usql/blob/master/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/AvroExtractor.cs 处有一个示例 Avro 提取器。您可以轻松地将代码改编成输出器。

另一种可能性是在数据湖存储之上启动 HDInsight 集群并使用 Pig、Hive 或 Spark。

使用 Azure 数据工厂Blob 存储,这实际上非常简单。这也应该非常便宜,因为您在 ADF 中执行时按秒付费,因此您只需为转换时间付费。无需基础设施。

如果您的 CSV 看起来像这样

ID,Name,Surname
1,Adam,Marczak
2,Tom,Kowalski
3,John,Johnson

上传到blob存储输入容器

为 ADFblob 存储添加链接服务

Select 你的存储空间

添加数据集

blob类型

并设置为CSV格式

具有这样的值

添加另一个数据集

blob类型

和select Avro类型

有喜欢的价值

添加管道

拖放 复制数据 activity

并且在 source select 你的 CSV 输入数据集

并且在 sink select 你的 target Avro 数据集

并且发布并且触发管道

输出

并且在 blob

通过检查你可以看到 Avro 文件

完整的 github 代码在这里 https://github.com/MarczakIO/azure-datafactory-csv-to-avro

如果您想了解数据工厂,请观看 ADF 介绍视频 https://youtu.be/EpDkxTHAhOs

如果您想动态地将输入和输出路径传递给 blob 文件,请查看有关 ADF 视频参数化的视频 https://youtu.be/pISBgwrdxPM

Python永远是你最好的朋友。请使用此示例代码将 csv 转换为 avro:

安装这些依赖项:

pip install fastavro
pip install pandas

执行以下 python 脚本。

from fastavro import writer, parse_schema
import pandas as pd

# Read CSV
df = pd.read_csv('sample.csv')

# Define AVRO schema
schema = {
    'doc': 'Documentation',
    'name': 'Subject',
    'namespace': 'test',
    'type': 'record',
    'fields': [{'name': c, 'type': 'string'} for c in df.columns]
}
parsed_schema = parse_schema(schema)

# Writing AVRO file
with open('sample.avro', 'wb') as out:
    writer(out, parsed_schema, df.to_dict('records'))

输入:sample.csv

col1,col2,col3
a,b,c
d,e,f
g,h,i

输出:sample.avro

Objavro.codecnullavro.schemaƒ{"type": "record", "name": "test.Subject", "fields": [{"name": "col1", "type": "string"}, {"name": "col2", "type": "string"}, {"name": "col3", "type": "string"}]}Y«Ÿ>[Ú   Ÿÿ  Æ?âQI$abcdefghiY«Ÿ>[Ú   Ÿÿ  Æ?âQI