将宏值传递给气流中的 sql 文件

Passing macros value to sql file in airflow

我有一个 sql 文件,有一个 sql 查询 :-

delete from xyz where id in = 3 and time = '{{ execution_date.subtract(hours=2).strftime("%Y-%m-%d %H:%M:%S") }}';

这里我在 sql 查询本身中编写宏,我想从操作员调用此 sql 查询的 python 文件中传递它的值。

time = f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''

我想将这个全局时间变量传递给 sql 文件,而不是在那里再次编写完整的宏。

PostgresOperator(dag=dag,
                 task_id='delete_entries', 
                 postgres_conn_id='database_connection',
                 sql='sql/delete_entry.sql')

如果我在使用 jinja 模板作为 {{ time }} 的查询中使用 time,而不是评估它,它仅作为完整的字符串传递。 请帮助,坚持了很久。

由于您想在两个运算符中使用 f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\'' 而无需复制代码,您可以将其定义为用户宏。

from datetime import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator


def ds_macro_format(execution_date, hours):
    return execution_date.subtract(hours=hours).strftime("%Y-%m-%d %H:%M:%S")


user_macros = {
    'format': ds_macro_format
}

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 6, 7),
}

dag = DAG(
    "Whosebug_question1",
    default_args=default_args,
    schedule_interval="@daily",
    user_defined_macros=user_macros
)

PostgresOperator(dag=dag,
                 task_id='delete_entries',
                 postgres_conn_id='database_connection',
                 sql='sql/delete_entry.sql')

delete_entry.sql 为:

delete from xyz where id in = 3 and time = {{ format(execution_date, hours=2) }};

假设您还想在 BashOperator 中使用宏,您可以这样做:

BashOperator(
    task_id='bash_task',
    bash_command='echo {{ format(execution_date, hours=2) }} ',
    dag=dag,
)