创建 Azure 函数以执行 ETL
Creating Azure Function to perform ETL
我对 Azure Functions 还很陌生,所以我在理解要采取什么步骤时遇到了一些困难。
我在 Python 中编写了一段代码,它从 MongoDB 数据库中获取特定的 JSON 文件,将其展平并作为 CSV 文件导出到 Azure Data Lake Storage .
我做了一些研究,认为我可以创建一个 Azure 函数并使用 Blob 存储触发器,它可以检查 JSON 文件是否已上传到 test directory
并自动执行我的 Python 脚本将其展平并将其导出为 CSV 文件。
但是,我现在如何修改我的 Python 脚本,使其导入已上传到 test directory
的 JSON 文件,而不是连接到 MongoDB 数据库?
from pymongo import MongoClient
import pandas as pd
import os, uuid, sys
import collections
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from pandas import json_normalize
from datetime import datetime, timedelta
mongo_client = MongoClient("xxxxxxx")
db = mongo_client.r_db
table = db.areas
document = table.find()
mongo_docs = list(document)
mongo_docs = json_normalize(mongo_docs)
mongo_docs.to_csv("areas.csv", sep = ",", index=False)
#print(mongo_docs)
try:
global service_client
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
"https", "xxxx"), credential='xxxxx')
file_system_client = service_client.get_file_system_client(file_system="root")
directory_client = file_system_client.get_directory_client("testdirectory")
file_client = directory_client.create_file("areas.csv")
local_file = open(r"C:\Users\areas.csv",'rb')
file_contents = local_file.read()
file_client.upload_data(file_contents, overwrite=True)
except Exception as e:
print(e)
对于此类问题,创建事件网格触发器或 Blob 存储触发器更好吗?
如有任何帮助或建议,我们将不胜感激。
However, how do I now amend my Python script in a way for it to import
that JSON file that has been uploaded to test directory rather than
connecting to the MongoDB database?
如果使用blob触发器,首先需要将数据传输到存储账户,然后才能在函数内部处理数据:
import logging
import azure.functions as func
def main(myblob: func.InputStream):
#just put the python script here.
logging.info(f"Python blob trigger function processed blob \n"
f"Name: {myblob.name}\n"
f"Blob Size: {myblob.length} bytes")
请注意不要使用blob输出绑定,需要手动编写处理逻辑,否则基于数据湖的对象将被破坏,无法再使用数据湖数据包接收。
Is it better to create an Event Grid Trigger or Blob Storage Trigger
for this sort of problem?
Any help or advice will be appreciated.
如果你的需求比较单一(即只需要在数据湖传入数据时执行),那么可以使用blob trigger。事件网格的优点是端点可以由许多不同的事件触发。
我对 Azure Functions 还很陌生,所以我在理解要采取什么步骤时遇到了一些困难。
我在 Python 中编写了一段代码,它从 MongoDB 数据库中获取特定的 JSON 文件,将其展平并作为 CSV 文件导出到 Azure Data Lake Storage .
我做了一些研究,认为我可以创建一个 Azure 函数并使用 Blob 存储触发器,它可以检查 JSON 文件是否已上传到 test directory
并自动执行我的 Python 脚本将其展平并将其导出为 CSV 文件。
但是,我现在如何修改我的 Python 脚本,使其导入已上传到 test directory
的 JSON 文件,而不是连接到 MongoDB 数据库?
from pymongo import MongoClient
import pandas as pd
import os, uuid, sys
import collections
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from pandas import json_normalize
from datetime import datetime, timedelta
mongo_client = MongoClient("xxxxxxx")
db = mongo_client.r_db
table = db.areas
document = table.find()
mongo_docs = list(document)
mongo_docs = json_normalize(mongo_docs)
mongo_docs.to_csv("areas.csv", sep = ",", index=False)
#print(mongo_docs)
try:
global service_client
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
"https", "xxxx"), credential='xxxxx')
file_system_client = service_client.get_file_system_client(file_system="root")
directory_client = file_system_client.get_directory_client("testdirectory")
file_client = directory_client.create_file("areas.csv")
local_file = open(r"C:\Users\areas.csv",'rb')
file_contents = local_file.read()
file_client.upload_data(file_contents, overwrite=True)
except Exception as e:
print(e)
对于此类问题,创建事件网格触发器或 Blob 存储触发器更好吗?
如有任何帮助或建议,我们将不胜感激。
However, how do I now amend my Python script in a way for it to import that JSON file that has been uploaded to test directory rather than connecting to the MongoDB database?
如果使用blob触发器,首先需要将数据传输到存储账户,然后才能在函数内部处理数据:
import logging
import azure.functions as func
def main(myblob: func.InputStream):
#just put the python script here.
logging.info(f"Python blob trigger function processed blob \n"
f"Name: {myblob.name}\n"
f"Blob Size: {myblob.length} bytes")
请注意不要使用blob输出绑定,需要手动编写处理逻辑,否则基于数据湖的对象将被破坏,无法再使用数据湖数据包接收。
Is it better to create an Event Grid Trigger or Blob Storage Trigger for this sort of problem?
Any help or advice will be appreciated.
如果你的需求比较单一(即只需要在数据湖传入数据时执行),那么可以使用blob trigger。事件网格的优点是端点可以由许多不同的事件触发。