将宏值传递给气流中的 sql 文件
Passing macros value to sql file in airflow
我有一个 sql 文件,有一个 sql 查询 :-
delete from xyz where id in = 3 and time = '{{ execution_date.subtract(hours=2).strftime("%Y-%m-%d %H:%M:%S") }}';
这里我在 sql 查询本身中编写宏,我想从操作员调用此 sql 查询的 python 文件中传递它的值。
time = f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''
我想将这个全局时间变量传递给 sql 文件,而不是在那里再次编写完整的宏。
PostgresOperator(dag=dag,
task_id='delete_entries',
postgres_conn_id='database_connection',
sql='sql/delete_entry.sql')
如果我在使用 jinja 模板作为 {{ time }}
的查询中使用 time
,而不是评估它,它仅作为完整的字符串传递。
请帮助,坚持了很久。
由于您想在两个运算符中使用 f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''
而无需复制代码,您可以将其定义为用户宏。
from datetime import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
def ds_macro_format(execution_date, hours):
return execution_date.subtract(hours=hours).strftime("%Y-%m-%d %H:%M:%S")
user_macros = {
'format': ds_macro_format
}
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 6, 7),
}
dag = DAG(
"Whosebug_question1",
default_args=default_args,
schedule_interval="@daily",
user_defined_macros=user_macros
)
PostgresOperator(dag=dag,
task_id='delete_entries',
postgres_conn_id='database_connection',
sql='sql/delete_entry.sql')
和 delete_entry.sql
为:
delete from xyz where id in = 3 and time = {{ format(execution_date, hours=2) }};
假设您还想在 BashOperator 中使用宏,您可以这样做:
BashOperator(
task_id='bash_task',
bash_command='echo {{ format(execution_date, hours=2) }} ',
dag=dag,
)
我有一个 sql 文件,有一个 sql 查询 :-
delete from xyz where id in = 3 and time = '{{ execution_date.subtract(hours=2).strftime("%Y-%m-%d %H:%M:%S") }}';
这里我在 sql 查询本身中编写宏,我想从操作员调用此 sql 查询的 python 文件中传递它的值。
time = f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''
我想将这个全局时间变量传递给 sql 文件,而不是在那里再次编写完整的宏。
PostgresOperator(dag=dag,
task_id='delete_entries',
postgres_conn_id='database_connection',
sql='sql/delete_entry.sql')
如果我在使用 jinja 模板作为 {{ time }}
的查询中使用 time
,而不是评估它,它仅作为完整的字符串传递。
请帮助,坚持了很久。
由于您想在两个运算符中使用 f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''
而无需复制代码,您可以将其定义为用户宏。
from datetime import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
def ds_macro_format(execution_date, hours):
return execution_date.subtract(hours=hours).strftime("%Y-%m-%d %H:%M:%S")
user_macros = {
'format': ds_macro_format
}
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 6, 7),
}
dag = DAG(
"Whosebug_question1",
default_args=default_args,
schedule_interval="@daily",
user_defined_macros=user_macros
)
PostgresOperator(dag=dag,
task_id='delete_entries',
postgres_conn_id='database_connection',
sql='sql/delete_entry.sql')
和 delete_entry.sql
为:
delete from xyz where id in = 3 and time = {{ format(execution_date, hours=2) }};
假设您还想在 BashOperator 中使用宏,您可以这样做:
BashOperator(
task_id='bash_task',
bash_command='echo {{ format(execution_date, hours=2) }} ',
dag=dag,
)