Airflow 模板化字段中的 if 语句

if statement in Airflow templated field

我有一个 DAG,它是 运行 一些雪花运算符,在 SQL 文件中,我将日期模板化如下:

'{{ prev_execution_date.subtract(minutes=15).in_tz('America/Toronto').to_datetime_string() }}'
'{{ execution_date.in_tz('America/Toronto').to_datetime_string() }}'

一切正常。

我也希望能够手动触发 DAG 并传入日期,所以我在查询文件中尝试了以下内容

{{ dag_run.conf['startdate'] if dag_run else prev_execution_date.subtract(minutes=15).in_tz('America/Toronto').to_datetime_string()  }}

(基本就是中的解)

这在我传入开始日期值的手动触发实例中工作正常,但 else 子句在它是计划实例时总是 returns 空白。

我是否遗漏了 else 子句中的某些内容,或者是否有完全绕过 if 语句的不同解决方案?

我正在使用 Airflow 1.10.12。

谢谢!

这是因为 dag_run 对象始终存在,无论它是手动 运行 还是计划 运行。

from datetime import datetime

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id="trigger_dag",
    start_date=datetime(2021, 4, 5),
    catchup=True,
    schedule_interval='@daily',
)
with dag:
    op = PythonOperator(
        task_id='a',
        python_callable=lambda x: print(x),
        op_kwargs={
            "value": "{{ dag_run.conf['startdate'] if dag_run['external_trigger'] else prev_execution_date.subtract(minutes=15).in_tz('America/Toronto').to_datetime_string() }}"
        },
    )

这是修改后的执行日期值的输出。

[2021-04-08 05:46:53,430] {taskinstance.py:901} INFO - Executing <Task(PythonOperator): a> on 2021-04-07T00:00:00+00:00
[2021-04-08 05:46:53,434] {standard_task_runner.py:54} INFO - Started process 2604 to run task
[2021-04-08 05:46:53,462] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'trigger_dag', 'a', '2021-04-07T00:00:00+00:00', '--job_id', '35', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/trigger.py', '--cfg_path', '/tmp/tmpe8zsnj8x']
[2021-04-08 05:46:53,463] {standard_task_runner.py:78} INFO - Job 35: Subtask a
[2021-04-08 05:46:53,509] {logging_mixin.py:112} INFO - Running <TaskInstance: trigger_dag.a 2021-04-07T00:00:00+00:00 [running]> on host 953a6668d603
[2021-04-08 05:46:53,564] {logging_mixin.py:112} INFO - 2021-04-05 19:45:00
[2021-04-08 05:46:53,565] {python_operator.py:114} INFO - Done. Returned value was: None
[2021-04-08 05:46:53,574] {taskinstance.py:1070} INFO - Marking task as SUCCESS.dag_id=trigger_dag, task_id=a, execution_date=20210407T000000, start_date=20210408T054653, end_date=20210408T054653
[2021-04-08 05:46:58,385] {local_task_job.py:102} INFO - Task exited with return code 0

这是从手动触发的 DAG 传递的 conf 值的输出。

[2021-04-08 05:47:47,431] {taskinstance.py:901} INFO - Executing <Task(PythonOperator): a> on 2021-04-08T05:47:31.333405+00:00
[2021-04-08 05:47:47,434] {standard_task_runner.py:54} INFO - Started process 2665 to run task
[2021-04-08 05:47:47,463] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'trigger_dag', 'a', '2021-04-08T05:47:31.333405+00:00', '--job_id', '36', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/trigger.py', '--cfg_path', '/tmp/tmpwoshw679']
[2021-04-08 05:47:47,464] {standard_task_runner.py:78} INFO - Job 36: Subtask a
[2021-04-08 05:47:47,512] {logging_mixin.py:112} INFO - Running <TaskInstance: trigger_dag.a 2021-04-08T05:47:31.333405+00:00 [running]> on host 953a6668d603
[2021-04-08 05:47:47,564] {logging_mixin.py:112} INFO - super-duper
[2021-04-08 05:47:47,564] {python_operator.py:114} INFO - Done. Returned value was: None
[2021-04-08 05:47:47,574] {taskinstance.py:1070} INFO - Marking task as SUCCESS.dag_id=trigger_dag, task_id=a, execution_date=20210408T054731, start_date=20210408T054747, end_date=20210408T054747
[2021-04-08 05:47:52,388] {local_task_job.py:102} INFO - Task exited with return code 0