如何在气流中使用默认变量

How to use Default variables in airflow

我们如何使用 airflow 中的默认变量,如下所示 link https://airflow.apache.org/code.html#default-variables

我在我的代码中使用了它,如下所示:

def decide_which_task():

    if {{ dag_run.task_id }}  is  "Move_file":    
            return "move_file"    
        else:    
            return "push_to_db"

但是我在日志文件中收到错误

NameError: global name 'dag_run' is not defined

您尝试使用的注解是字符串中 Jinja 模板的注解。

要在任务中使用相同的变量,您需要:

  • PythonOperator
  • 中使用参数provide_context=True
  • 将函数签名更改为def decide_which_task(**context):
  • 然后在上下文之外访问变量,例如mytask = context['task_id']

代码:

def decide_which_task(**context):

if context['task_id']  is  "Move_file":    
        return "move_file"    
    else:    
        return "push_to_db"