如何在气流中使用默认变量
How to use Default variables in airflow
我们如何使用 airflow
中的默认变量,如下所示 link
https://airflow.apache.org/code.html#default-variables
我在我的代码中使用了它,如下所示:
def decide_which_task():
if {{ dag_run.task_id }} is "Move_file":
return "move_file"
else:
return "push_to_db"
但是我在日志文件中收到错误
NameError: global name 'dag_run' is not defined
您尝试使用的注解是字符串中 Jinja 模板的注解。
要在任务中使用相同的变量,您需要:
- 在
PythonOperator
中使用参数provide_context=True
- 将函数签名更改为
def decide_which_task(**context):
- 然后在上下文之外访问变量,例如
mytask = context['task_id']
代码:
def decide_which_task(**context):
if context['task_id'] is "Move_file":
return "move_file"
else:
return "push_to_db"
我们如何使用 airflow
中的默认变量,如下所示 link
https://airflow.apache.org/code.html#default-variables
我在我的代码中使用了它,如下所示:
def decide_which_task():
if {{ dag_run.task_id }} is "Move_file":
return "move_file"
else:
return "push_to_db"
但是我在日志文件中收到错误
NameError: global name 'dag_run' is not defined
您尝试使用的注解是字符串中 Jinja 模板的注解。
要在任务中使用相同的变量,您需要:
- 在
PythonOperator
中使用参数 - 将函数签名更改为
def decide_which_task(**context):
- 然后在上下文之外访问变量,例如
mytask = context['task_id']
provide_context=True
代码:
def decide_which_task(**context):
if context['task_id'] is "Move_file":
return "move_file"
else:
return "push_to_db"