气流任务引用多个以前的任务?

Airflow task to refer to multiple previous tasks?

有没有办法让一个任务需要完成多个上游任务,而这些任务仍然能够独立完成?

create_dashboard 应该需要 load_fcr 和 load_survey 才能成功完成。

我不想强迫 'survey' 任务链中的任何东西要求 'fcr' 任务链中的任何东西来完成。我希望他们并行处理并且即使一个失败了也仍然完成。但是,仪表板任务需要两者都完成加载到数据库才能开始。

fcr *-->*-->*
             \
               ---> create_dashboard
                /
survey *-->*-->*
download_fcr.set_downstream(process_fcr)
process_fcr.set_downstream(load_fcr)

download_survey.set_downstream(process_survey)
process_survey.set_downstream(load_survey)

load_survey.set_downstream(create_dashboard)
load_fcr.set_downstream(create_dashboard)

您可以将任务列表传递给set_upstream或set_downstream。在您的情况下,如果您特别想使用 set_upstream,您可以将您的依赖项描述为:

create_dashboard.set_upstream([load_fcr, load_survey])

load_fcr.set_upstream(process_fcr)
process_fcr.set_upstream(download_fcr)

load_survey.set_upstream(process_survey)
process_survey.set_upstream(download_survey)

看看 airflow's source code:即使您只将一个任务对象传递给 set_upstream,它实际上会在执行任何操作之前围绕它包装一个列表。