气流任务引用多个以前的任务?
Airflow task to refer to multiple previous tasks?
有没有办法让一个任务需要完成多个上游任务,而这些任务仍然能够独立完成?
- download_fcr --> process_fcr --> load_fcr
- download_survey --> process_survey --> load_survey
create_dashboard 应该需要 load_fcr 和 load_survey 才能成功完成。
我不想强迫 'survey' 任务链中的任何东西要求 'fcr' 任务链中的任何东西来完成。我希望他们并行处理并且即使一个失败了也仍然完成。但是,仪表板任务需要两者都完成加载到数据库才能开始。
fcr *-->*-->*
\
---> create_dashboard
/
survey *-->*-->*
download_fcr.set_downstream(process_fcr)
process_fcr.set_downstream(load_fcr)
download_survey.set_downstream(process_survey)
process_survey.set_downstream(load_survey)
load_survey.set_downstream(create_dashboard)
load_fcr.set_downstream(create_dashboard)
您可以将任务列表传递给set_upstream或set_downstream。在您的情况下,如果您特别想使用 set_upstream,您可以将您的依赖项描述为:
create_dashboard.set_upstream([load_fcr, load_survey])
load_fcr.set_upstream(process_fcr)
process_fcr.set_upstream(download_fcr)
load_survey.set_upstream(process_survey)
process_survey.set_upstream(download_survey)
看看 airflow's source code:即使您只将一个任务对象传递给 set_upstream,它实际上会在执行任何操作之前围绕它包装一个列表。
有没有办法让一个任务需要完成多个上游任务,而这些任务仍然能够独立完成?
- download_fcr --> process_fcr --> load_fcr
- download_survey --> process_survey --> load_survey
create_dashboard 应该需要 load_fcr 和 load_survey 才能成功完成。
我不想强迫 'survey' 任务链中的任何东西要求 'fcr' 任务链中的任何东西来完成。我希望他们并行处理并且即使一个失败了也仍然完成。但是,仪表板任务需要两者都完成加载到数据库才能开始。
fcr *-->*-->*
\
---> create_dashboard
/
survey *-->*-->*
download_fcr.set_downstream(process_fcr)
process_fcr.set_downstream(load_fcr)
download_survey.set_downstream(process_survey)
process_survey.set_downstream(load_survey)
load_survey.set_downstream(create_dashboard)
load_fcr.set_downstream(create_dashboard)
您可以将任务列表传递给set_upstream或set_downstream。在您的情况下,如果您特别想使用 set_upstream,您可以将您的依赖项描述为:
create_dashboard.set_upstream([load_fcr, load_survey])
load_fcr.set_upstream(process_fcr)
process_fcr.set_upstream(download_fcr)
load_survey.set_upstream(process_survey)
process_survey.set_upstream(download_survey)
看看 airflow's source code:即使您只将一个任务对象传递给 set_upstream,它实际上会在执行任何操作之前围绕它包装一个列表。