创建 Azure 函数以执行 ETL

Creating Azure Function to perform ETL

我对 Azure Functions 还很陌生,所以我在理解要采取什么步骤时遇到了一些困难。

我在 Python 中编写了一段代码,它从 MongoDB 数据库中获取特定的 JSON 文件,将其展平并作为 CSV 文件导出到 Azure Data Lake Storage .

我做了一些研究,认为我可以创建一个 Azure 函数并使用 Blob 存储触发器,它可以检查 JSON 文件是否已上传到 test directory 并自动执行我的 Python 脚本将其展平并将其导出为 CSV 文件。

但是,我现在如何修改我的 Python 脚本,使其导入已上传到 test directory 的 JSON 文件,而不是连接到 MongoDB 数据库?

from pymongo import MongoClient
import pandas as pd
import os, uuid, sys
import collections
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from pandas import json_normalize
from datetime import datetime, timedelta

mongo_client = MongoClient("xxxxxxx")
db = mongo_client.r_db
table = db.areas

document = table.find()
mongo_docs = list(document)
mongo_docs = json_normalize(mongo_docs)
mongo_docs.to_csv("areas.csv", sep = ",", index=False) 

#print(mongo_docs)
try:  
    global service_client
        
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
        "https", "xxxx"), credential='xxxxx')
    

    file_system_client = service_client.get_file_system_client(file_system="root")

    directory_client = file_system_client.get_directory_client("testdirectory")

    file_client = directory_client.create_file("areas.csv")
    local_file = open(r"C:\Users\areas.csv",'rb')

    file_contents = local_file.read()

    file_client.upload_data(file_contents, overwrite=True)

except Exception as e:
    print(e) 

对于此类问题,创建事件网格触发器或 Blob 存储触发器更好吗?

如有任何帮助或建议,我们将不胜感激。

However, how do I now amend my Python script in a way for it to import that JSON file that has been uploaded to test directory rather than connecting to the MongoDB database?

如果使用blob触发器,首先需要将数据传输到存储账户,然后才能在函数内部处理数据:

import logging

import azure.functions as func


def main(myblob: func.InputStream):
    #just put the python script here.
    logging.info(f"Python blob trigger function processed blob \n"
                 f"Name: {myblob.name}\n"
                 f"Blob Size: {myblob.length} bytes")

请注意不要使用blob输出绑定,需要手动编写处理逻辑,否则基于数据湖的对象将被破坏,无法再使用数据湖数据包接收。

Is it better to create an Event Grid Trigger or Blob Storage Trigger for this sort of problem?

Any help or advice will be appreciated.

如果你的需求比较单一(即只需要在数据湖传入数据时执行),那么可以使用blob trigger。事件网格的优点是端点可以由许多不同的事件触发。