Airflow DAG 中的外部文件
External files in Airflow DAG
我正在尝试访问 Airflow 任务中的外部文件以读取一些 sql,我得到 "file not found"。有人遇到过这个吗?
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
dag = DAG(
'my_dat',
start_date=datetime(2017, 1, 1),
catchup=False,
schedule_interval=timedelta(days=1)
)
def run_query():
# read the query
query = open('sql/queryfile.sql')
# run the query
execute(query)
tas = PythonOperator(
task_id='run_query', dag=dag, python_callable=run_query)
日志状态如下:
IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'
我知道我可以简单地将查询复制并粘贴到同一个文件中,这确实不是一个很好的解决方案。有多个查询并且文本非常大,用 Python 代码嵌入它会降低可读性。
所有相对路径均参考AIRFLOW_HOME环境变量。尝试:
- 给出绝对路径
- 相对于 AIRFLOW_HOME
放置文件
- 尝试在 python 可调用文件中记录 PWD,然后决定要提供的路径(最佳选择)
这里是一个使用 变量 的例子,使它变得简单。
首先在Airflow UI
-> Admin
-> Variable
中添加变量,例如。 {key: 'sql_path', values: 'your_sql_script_folder'}
然后在您的 DAG 中添加以下代码,以使用刚刚添加的 Airflow 中的变量。
DAG代码:
import airflow
from airflow.models import Variable
tmpl_search_path = Variable.get("sql_path")
dag = airflow.DAG(
'tutorial',
schedule_interval="@daily",
template_searchpath=tmpl_search_path, # this
default_args=default_args
)
现在您可以使用sql文件夹变量
下的脚本名称或路径
您可以在 this
中了解更多信息
假设sql
目录是相对于当前Python文件的,你可以像这样算出sql文件的绝对路径:
import os
CUR_DIR = os.path.abspath(os.path.dirname(__file__))
def run_query():
# read the query
query = open(f"{CUR_DIR}/sql/queryfile.sql")
# run the query
execute(query)
我正在尝试访问 Airflow 任务中的外部文件以读取一些 sql,我得到 "file not found"。有人遇到过这个吗?
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
dag = DAG(
'my_dat',
start_date=datetime(2017, 1, 1),
catchup=False,
schedule_interval=timedelta(days=1)
)
def run_query():
# read the query
query = open('sql/queryfile.sql')
# run the query
execute(query)
tas = PythonOperator(
task_id='run_query', dag=dag, python_callable=run_query)
日志状态如下:
IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'
我知道我可以简单地将查询复制并粘贴到同一个文件中,这确实不是一个很好的解决方案。有多个查询并且文本非常大,用 Python 代码嵌入它会降低可读性。
所有相对路径均参考AIRFLOW_HOME环境变量。尝试:
- 给出绝对路径
- 相对于 AIRFLOW_HOME 放置文件
- 尝试在 python 可调用文件中记录 PWD,然后决定要提供的路径(最佳选择)
这里是一个使用 变量 的例子,使它变得简单。
首先在
Airflow UI
->Admin
->Variable
中添加变量,例如。{key: 'sql_path', values: 'your_sql_script_folder'}
然后在您的 DAG 中添加以下代码,以使用刚刚添加的 Airflow 中的变量。
DAG代码:
import airflow
from airflow.models import Variable
tmpl_search_path = Variable.get("sql_path")
dag = airflow.DAG(
'tutorial',
schedule_interval="@daily",
template_searchpath=tmpl_search_path, # this
default_args=default_args
)
现在您可以使用sql文件夹变量
下的脚本名称或路径
您可以在 this
中了解更多信息
假设sql
目录是相对于当前Python文件的,你可以像这样算出sql文件的绝对路径:
import os
CUR_DIR = os.path.abspath(os.path.dirname(__file__))
def run_query():
# read the query
query = open(f"{CUR_DIR}/sql/queryfile.sql")
# run the query
execute(query)