Apache Airflow:基于执行时间的动态时间间隔

Apache Airflow: dynamic time interval based on execution time

目前我有一些 DAG 每 15 分钟安排一次,它们只调用一个 DataflowTemplateOperator。我使用 "new_dag" 函数循环创建多个 DAG:

def new_dag(job):
  default_args = {
    'start_date': datetime(2020, 3, 11),
    'retries': 0,
    'dataflow_default_options': {
        'project': config.PROJECT,
        'region': config.REGION,
        'zone': config.ZONE,
        'tempLocation': config.GS_TEMP_LOCATION,
    }
  }

  dag = DAG(dag_id=job,
          schedule_interval=config.MIN15_CRON,
          catchup=False,
          default_args=default_args)

  with dag:
    quarter_start, quarter_end = get_last_quarter()
    dataflow_batch = dataflow_operator.DataflowTemplateOperator(
        task_id=job,
        template=config.GS_TEMPLATE_LOCATION,
        parameters={"startTs": quarter_start, "endTs": quarter_end}

  return dag

我的自定义函数 "get_last_quarter" 从 datetime.now() 开始计算两个时间戳。例如,如果 now() 给出“2020-03-16 18:33:00”,我的函数将生成前一刻钟的时间间隔:

此代码有效,但我会使用 "execution_date" 或 "ts" 从 Airflow 宏读取时间戳,而不是 datetime.now()。

我该怎么做? 我可以在 "with dag" 中但在运算符之外将宏作为 python 变量读取? https://airflow.apache.org/docs/stable/macros.html

我找到了添加新运算符 PythonOperator 的解决方案:

    get_time = PythonOperator(
        task_id="get_time",
        python_callable= get_last_quarter,
        provide_context=True
    )

    dataflow_batch = dataflow_operator.DataflowTemplateOperator
        task_id=job,
        template=config.GS_TEMPLATE_LOCATION,
        parameters={"inputStartTs": '{{ ti.xcom_pull("get_time")[0] }}',
                    "inputEndTs": '{{ ti.xcom_pull("get_time")[1] }}'}
    )

要在运算符之间传递参数,可以使用带有 "ti" 键(默认键)的 xcom。 在 python 函数 "get_last_quarter" 中,可以 return 只是一个值,在我的例子中是一个元组中的两个值。