在 airflow 中启动一个带有可变并行任务的 subdag
Launch a subdag with variable parallel tasks in airflow
我有一个气流工作流,我想修改(见底部的插图)。
但是,我在文档中找不到这样做的方法。
我看过 subdags、分支和 xcoms,但没有成功。
似乎没有一种方法可以根据来自操作员的 return 在 subdag 中并行指定多少任务 运行。
为了增加问题,subdag 中的每个任务都会收到一个不同的参数(列表中的一个元素 return 由前一个运算符编辑)
这是我正在尝试做的事情的说明:
我也 运行 对此感兴趣,但还没有真正找到解决它的干净方法。如果您知道您将传递给每个子标签的所有不同的可能参数......那么您可以做的是将其硬编码到 DAG 文件中,并且始终使用每个可能的子标签创建 DAG。然后你有一个运算符(类似于你的 "get every n"),它获取你想要 运行 的子标签列表,并将它标记任何不在列表中的下游子标签为 skipped
。像这样:
SUBDAGS = {
'a': {'id': 'foo'},
'b': {'id': 'bar'},
'c': {'id': 'test'},
'd': {'id': 'hi'},
}
def _select_subdags(**context):
names = fetch_list() # returns ["a", "c", "d"]
tasks_to_skip = ['my_subdag_' + name for name in set(SUBDAGS) - set(names)]
session = Session()
tis = session.query(TaskInstance).filter(
TaskInstance.dag_id == context['dag'].dag_id,
TaskInstance.execution_date == context['ti'].execution_date,
TaskInstance.task_id.in_(tasks_to_skip),
)
for ti in tis:
now = datetime.utcnow()
ti.state = State.SKIPPED
ti.start_date = now
ti.end_date = now
session.merge(ti)
session.commit()
session.close()
select_subdags = PythonOperator(
task_id='select_subdags',
dag=dag,
provide_context=True,
python_callable=_select_subdags,
)
for name, params in SUBDAGS.iteritems():
child_dag_id = 'my_subdag_' + name
subdag_op = SubDagOperator(
task_id=child_dag_id,
dag=dag,
subdag=my_subdag(dag.dag_id, child_dag_id, params),
)
select_subdags >> subdag_op
显然不理想,尤其是当您最终只想 运行 数百个子标签时。我们还 运行 解决了单个 DAG 中数千个子标记的一些性能问题,因为它可能导致大量任务实例,其中大部分被简单地跳过。
我有一个气流工作流,我想修改(见底部的插图)。
但是,我在文档中找不到这样做的方法。
我看过 subdags、分支和 xcoms,但没有成功。
似乎没有一种方法可以根据来自操作员的 return 在 subdag 中并行指定多少任务 运行。
为了增加问题,subdag 中的每个任务都会收到一个不同的参数(列表中的一个元素 return 由前一个运算符编辑)
这是我正在尝试做的事情的说明:
我也 运行 对此感兴趣,但还没有真正找到解决它的干净方法。如果您知道您将传递给每个子标签的所有不同的可能参数......那么您可以做的是将其硬编码到 DAG 文件中,并且始终使用每个可能的子标签创建 DAG。然后你有一个运算符(类似于你的 "get every n"),它获取你想要 运行 的子标签列表,并将它标记任何不在列表中的下游子标签为 skipped
。像这样:
SUBDAGS = {
'a': {'id': 'foo'},
'b': {'id': 'bar'},
'c': {'id': 'test'},
'd': {'id': 'hi'},
}
def _select_subdags(**context):
names = fetch_list() # returns ["a", "c", "d"]
tasks_to_skip = ['my_subdag_' + name for name in set(SUBDAGS) - set(names)]
session = Session()
tis = session.query(TaskInstance).filter(
TaskInstance.dag_id == context['dag'].dag_id,
TaskInstance.execution_date == context['ti'].execution_date,
TaskInstance.task_id.in_(tasks_to_skip),
)
for ti in tis:
now = datetime.utcnow()
ti.state = State.SKIPPED
ti.start_date = now
ti.end_date = now
session.merge(ti)
session.commit()
session.close()
select_subdags = PythonOperator(
task_id='select_subdags',
dag=dag,
provide_context=True,
python_callable=_select_subdags,
)
for name, params in SUBDAGS.iteritems():
child_dag_id = 'my_subdag_' + name
subdag_op = SubDagOperator(
task_id=child_dag_id,
dag=dag,
subdag=my_subdag(dag.dag_id, child_dag_id, params),
)
select_subdags >> subdag_op
显然不理想,尤其是当您最终只想 运行 数百个子标签时。我们还 运行 解决了单个 DAG 中数千个子标记的一些性能问题,因为它可能导致大量任务实例,其中大部分被简单地跳过。