Airflow DAG 中的外部文件

External files in Airflow DAG

我正在尝试访问 Airflow 任务中的外部文件以读取一些 sql,我得到 "file not found"。有人遇到过这个吗?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)

日志状态如下:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'

我知道我可以简单地将查询复制并粘贴到同一个文件中,这确实不是一个很好的解决方案。有多个查询并且文本非常大,用 Python 代码嵌入它会降低可读性。

所有相对路径均参考AIRFLOW_HOME环境变量。尝试:

  • 给出绝对路径
  • 相对于 AIRFLOW_HOME
  • 放置文件
  • 尝试在 python 可调用文件中记录 PWD,然后决定要提供的路径(最佳选择)

这里是一个使用 变量 的例子,使它变得简单。

  • 首先在Airflow UI -> Admin -> Variable中添加变量,例如。 {key: 'sql_path', values: 'your_sql_script_folder'}

  • 然后在您的 DAG 中添加以下代码,以使用刚刚添加的 Airflow 中的变量。

DAG代码:

import airflow
from airflow.models import Variable

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
   'tutorial',
    schedule_interval="@daily",
    template_searchpath=tmpl_search_path,  # this
    default_args=default_args
)
  • 现在您可以使用sql文件夹变量

  • 下的脚本名称或路径
  • 您可以在 this

  • 中了解更多信息

假设sql目录是相对于当前Python文件的,你可以像这样算出sql文件的绝对路径:

import os

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def run_query():
    # read the query
    query = open(f"{CUR_DIR}/sql/queryfile.sql")
    # run the query
    execute(query)