Apache Airflow:基于执行时间的动态时间间隔
Apache Airflow: dynamic time interval based on execution time
目前我有一些 DAG 每 15 分钟安排一次,它们只调用一个 DataflowTemplateOperator。我使用 "new_dag" 函数循环创建多个 DAG:
def new_dag(job):
default_args = {
'start_date': datetime(2020, 3, 11),
'retries': 0,
'dataflow_default_options': {
'project': config.PROJECT,
'region': config.REGION,
'zone': config.ZONE,
'tempLocation': config.GS_TEMP_LOCATION,
}
}
dag = DAG(dag_id=job,
schedule_interval=config.MIN15_CRON,
catchup=False,
default_args=default_args)
with dag:
quarter_start, quarter_end = get_last_quarter()
dataflow_batch = dataflow_operator.DataflowTemplateOperator(
task_id=job,
template=config.GS_TEMPLATE_LOCATION,
parameters={"startTs": quarter_start, "endTs": quarter_end}
return dag
我的自定义函数 "get_last_quarter" 从 datetime.now() 开始计算两个时间戳。例如,如果 now() 给出“2020-03-16 18:33:00”,我的函数将生成前一刻钟的时间间隔:
- 2020-03-1618:15:00
- 2020-03-1618:30:00
此代码有效,但我会使用 "execution_date" 或 "ts" 从 Airflow 宏读取时间戳,而不是 datetime.now()。
我该怎么做?
我可以在 "with dag" 中但在运算符之外将宏作为 python 变量读取?
https://airflow.apache.org/docs/stable/macros.html
我找到了添加新运算符 PythonOperator 的解决方案:
get_time = PythonOperator(
task_id="get_time",
python_callable= get_last_quarter,
provide_context=True
)
dataflow_batch = dataflow_operator.DataflowTemplateOperator
task_id=job,
template=config.GS_TEMPLATE_LOCATION,
parameters={"inputStartTs": '{{ ti.xcom_pull("get_time")[0] }}',
"inputEndTs": '{{ ti.xcom_pull("get_time")[1] }}'}
)
要在运算符之间传递参数,可以使用带有 "ti" 键(默认键)的 xcom。
在 python 函数 "get_last_quarter" 中,可以 return 只是一个值,在我的例子中是一个元组中的两个值。
目前我有一些 DAG 每 15 分钟安排一次,它们只调用一个 DataflowTemplateOperator。我使用 "new_dag" 函数循环创建多个 DAG:
def new_dag(job):
default_args = {
'start_date': datetime(2020, 3, 11),
'retries': 0,
'dataflow_default_options': {
'project': config.PROJECT,
'region': config.REGION,
'zone': config.ZONE,
'tempLocation': config.GS_TEMP_LOCATION,
}
}
dag = DAG(dag_id=job,
schedule_interval=config.MIN15_CRON,
catchup=False,
default_args=default_args)
with dag:
quarter_start, quarter_end = get_last_quarter()
dataflow_batch = dataflow_operator.DataflowTemplateOperator(
task_id=job,
template=config.GS_TEMPLATE_LOCATION,
parameters={"startTs": quarter_start, "endTs": quarter_end}
return dag
我的自定义函数 "get_last_quarter" 从 datetime.now() 开始计算两个时间戳。例如,如果 now() 给出“2020-03-16 18:33:00”,我的函数将生成前一刻钟的时间间隔:
- 2020-03-1618:15:00
- 2020-03-1618:30:00
此代码有效,但我会使用 "execution_date" 或 "ts" 从 Airflow 宏读取时间戳,而不是 datetime.now()。
我该怎么做? 我可以在 "with dag" 中但在运算符之外将宏作为 python 变量读取? https://airflow.apache.org/docs/stable/macros.html
我找到了添加新运算符 PythonOperator 的解决方案:
get_time = PythonOperator(
task_id="get_time",
python_callable= get_last_quarter,
provide_context=True
)
dataflow_batch = dataflow_operator.DataflowTemplateOperator
task_id=job,
template=config.GS_TEMPLATE_LOCATION,
parameters={"inputStartTs": '{{ ti.xcom_pull("get_time")[0] }}',
"inputEndTs": '{{ ti.xcom_pull("get_time")[1] }}'}
)
要在运算符之间传递参数,可以使用带有 "ti" 键(默认键)的 xcom。 在 python 函数 "get_last_quarter" 中,可以 return 只是一个值,在我的例子中是一个元组中的两个值。