ADF 管道中的 Azure 函数使用 Python 脚本
Azure function in ADF pipeline using a Python script
我正在尝试 运行 我在管道中 Azure 数据工厂中的以下脚本。我的 Python 代码从 Blob 存储中检索 2 个 CSV 文件,并根据密钥将它们合并为一个文件,并将其上传到数据湖存储中。我已经尝试使用函数应用程序块,它给了我 InternalServerError 我也尝试了 Web activity 其中 运行s 没有错误。问题是当我 运行 管道时,文件没有创建,即使管道 运行 成功(使用 Web 块)。当我调用主函数并且文件在数据湖存储中创建时,该函数在本地也 运行s。我也在 VS Code 中尝试过 http 触发器和持久函数,但是 none 他们在 Azure 中创建了“merged.csv”文件。
我的Python脚本(init.py):
import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
STORAGEACCOUNTKEY= '****'
LOCALFILENAME= ['file1.csv', 'file2.csv']
CONTAINERNAME= 'inputblob'
file1 = pd.DataFrame()
file2 = pd.DataFrame()
#download from blob
blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)
for i in LOCALFILENAME:
with open(i, "wb") as my_blobs:
blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
blob_data = blob_client_instance.download_blob()
blob_data.readinto(my_blobs)
if i == 'file1.csv':
file1 = pd.read_csv(i)
if i == 'file2.csv':
file2 = pd.read_csv(i)
# load
summary = pd.merge(left=file1, right=file2, on='key', how='inner')
summary.to_csv()
global service_client
service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential='****')
file_system_client = service_client.get_file_system_client(file_system="outputdatalake")
directory_client = file_system_client.get_directory_client("functionapp")
file_client = directory_client.create_file("merged.csv")
file_contents = summary.to_csv()
file_client.upload_data(file_contents, overwrite=True)
return("This HTTP triggered function executed successfully.")
我的JSON文件(function.json):
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get",
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
您的代码看起来不错,并且在另一个环境中运行良好。因此,Azure 函数中的以下原因可能导致内部服务器错误:
- 确保将 Local.Settings.json 文件中的所有值添加到应用程序设置(FunctionApp -> 配置 -> 应用程序设置)
- 检查 CORS。尝试添加“*”(启用 CORS 时针对存储资源发出的任何请求必须具有有效授权 header 或必须针对 public 资源发出。)
我能想到有 2 个原因,这可能是导致您出现问题的原因。
A - 检查您的 requirements.txt。您所有的 python 库都应该在那里。它应该看起来像这样。
azure-functions
pandas==1.3.4
azure-storage-blob==12.9.0
azure-storage-file-datalake==12.5.0
B - 接下来,您似乎正在将文件写入 Functions worker 内存。这是不允许的,也是完全没有必要的。这可以解释为什么它在您的本地机器上工作,但在 Azure 中不工作。你可以不这样做就实现你想要的。请参阅下面的代码部分,它应该符合您的目的。我们将 csv 从 blob 加载到数据帧的方式略有变化。
import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func
from io import StringIO
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
STORAGEACCOUNTKEY= 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
LOCALFILENAME= ['file1.csv', 'file2.csv']
CONTAINERNAME= 'inputblob'
file1 = pd.DataFrame()
file2 = pd.DataFrame()
#download from blob
blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)
for i in LOCALFILENAME:
blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
blob_data = blob_client_instance.download_blob()
if i == 'file1.csv':
file1 = pd.read_csv(StringIO(blob_data.content_as_text()))
if i == 'file2.csv':
file2 = pd.read_csv(StringIO(blob_data.content_as_text()))
# load
summary = pd.merge(left=file1, right=file2, on='key', how='inner')
summary.to_csv()
service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential=STORAGEACCOUNTKEY)
file_system_client = service_client.get_file_system_client(file_system="outputdatalake")
directory_client = file_system_client.get_directory_client("my-directory")
file_client = directory_client.create_file("merged.csv")
file_contents = summary.to_csv()
file_client.upload_data(file_contents, overwrite=True)
return("This HTTP triggered function executed successfully.")
我正在尝试 运行 我在管道中 Azure 数据工厂中的以下脚本。我的 Python 代码从 Blob 存储中检索 2 个 CSV 文件,并根据密钥将它们合并为一个文件,并将其上传到数据湖存储中。我已经尝试使用函数应用程序块,它给了我 InternalServerError 我也尝试了 Web activity 其中 运行s 没有错误。问题是当我 运行 管道时,文件没有创建,即使管道 运行 成功(使用 Web 块)。当我调用主函数并且文件在数据湖存储中创建时,该函数在本地也 运行s。我也在 VS Code 中尝试过 http 触发器和持久函数,但是 none 他们在 Azure 中创建了“merged.csv”文件。
我的Python脚本(init.py):
import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
STORAGEACCOUNTKEY= '****'
LOCALFILENAME= ['file1.csv', 'file2.csv']
CONTAINERNAME= 'inputblob'
file1 = pd.DataFrame()
file2 = pd.DataFrame()
#download from blob
blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)
for i in LOCALFILENAME:
with open(i, "wb") as my_blobs:
blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
blob_data = blob_client_instance.download_blob()
blob_data.readinto(my_blobs)
if i == 'file1.csv':
file1 = pd.read_csv(i)
if i == 'file2.csv':
file2 = pd.read_csv(i)
# load
summary = pd.merge(left=file1, right=file2, on='key', how='inner')
summary.to_csv()
global service_client
service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential='****')
file_system_client = service_client.get_file_system_client(file_system="outputdatalake")
directory_client = file_system_client.get_directory_client("functionapp")
file_client = directory_client.create_file("merged.csv")
file_contents = summary.to_csv()
file_client.upload_data(file_contents, overwrite=True)
return("This HTTP triggered function executed successfully.")
我的JSON文件(function.json):
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get",
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
您的代码看起来不错,并且在另一个环境中运行良好。因此,Azure 函数中的以下原因可能导致内部服务器错误:
- 确保将 Local.Settings.json 文件中的所有值添加到应用程序设置(FunctionApp -> 配置 -> 应用程序设置)
- 检查 CORS。尝试添加“*”(启用 CORS 时针对存储资源发出的任何请求必须具有有效授权 header 或必须针对 public 资源发出。)
我能想到有 2 个原因,这可能是导致您出现问题的原因。
A - 检查您的 requirements.txt。您所有的 python 库都应该在那里。它应该看起来像这样。
azure-functions
pandas==1.3.4
azure-storage-blob==12.9.0
azure-storage-file-datalake==12.5.0
B - 接下来,您似乎正在将文件写入 Functions worker 内存。这是不允许的,也是完全没有必要的。这可以解释为什么它在您的本地机器上工作,但在 Azure 中不工作。你可以不这样做就实现你想要的。请参阅下面的代码部分,它应该符合您的目的。我们将 csv 从 blob 加载到数据帧的方式略有变化。
import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func
from io import StringIO
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
STORAGEACCOUNTKEY= 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
LOCALFILENAME= ['file1.csv', 'file2.csv']
CONTAINERNAME= 'inputblob'
file1 = pd.DataFrame()
file2 = pd.DataFrame()
#download from blob
blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)
for i in LOCALFILENAME:
blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
blob_data = blob_client_instance.download_blob()
if i == 'file1.csv':
file1 = pd.read_csv(StringIO(blob_data.content_as_text()))
if i == 'file2.csv':
file2 = pd.read_csv(StringIO(blob_data.content_as_text()))
# load
summary = pd.merge(left=file1, right=file2, on='key', how='inner')
summary.to_csv()
service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential=STORAGEACCOUNTKEY)
file_system_client = service_client.get_file_system_client(file_system="outputdatalake")
directory_client = file_system_client.get_directory_client("my-directory")
file_client = directory_client.create_file("merged.csv")
file_contents = summary.to_csv()
file_client.upload_data(file_contents, overwrite=True)
return("This HTTP triggered function executed successfully.")