Airflow - 如何在 for 循环的迭代之间设置任务依赖关系?
Airflow - how to set task dependencies between iterations of a for loop?
我正在使用 Airflow 运行 for 循环中的一组任务。循环的目的是遍历数据库 table 名称列表并执行以下操作:
for table_name in list_of_tables:
if table exists in database (BranchPythonOperator)
do nothing (DummyOperator)
else:
create table (JdbcOperator)
insert records into table (JdbcOperator, Trigger on One Success)
在网络上 UI,这看起来像:
目前,Airflow 从上到下然后从左到右执行此图像中的任务,例如:tbl_exists_fake_table_one
--> tbl_exists_fake_table_two
--> tbl_create_fake_table_one
,等等
但是,fake_table_two
的 insert
语句依赖于 fake_table_one
的更新,Airflow 当前未捕获到依赖项。 (从技术上讲,这种依赖是通过list_of_table_names
的顺序捕获的,但我相信在更复杂的情况下这会容易出错)
我想要与 fake_table_one
相关的所有任务到 运行,然后是与 fake_table_two
相关的所有任务。我怎样才能在气流中完成这个?
完整代码如下:
for tbl_name in list_of_table_names:
# Check if table exists by querying information tables
def has_table(tbl_name=tbl_name):
p = JdbcHook('conn_id')
sql =""" select count(*) from system.tables where name = '{}' """.format(tbl_name.upper())
count = p.get_records(sql)[0][0] #unpack the list/tuple
# If the query didn't return rows, branch to Create Table Task
# otherwise, branch to Dummy Operator (Airflow requires that both branches have a task)
if count == 0:
return 'tbl_create_{}'.format(tbl_name)
else:
return 'dummy_{}'.format(tbl_name)
# run has_table python function
exists = BranchPythonOperator(
task_id='tbl_exists_{}'.format(tbl_name),
python_callable=has_table,
depends_on_past=False,
dag=dag
)
# Dummy Operator
dummy = DummyOperator(task_id='dummy_{}'.format(tbl_name),dag=dag,depends_on_past=False)
# Run create table SQL script
create = JdbcOperator(
task_id='tbl_create_{}'.format(tbl_name),
jdbc_conn_id='conn_id',
sql = sql_parse(script_path, 'sql/sql_create/{}.sql'.format(tbl_name)),
depends_on_past=False,
dag = dag
)
# Run insert or truncate/replace SQL script
upsert = JdbcOperator(
task_id='tbl_upsert_{}'.format(tbl_name),
jdbc_conn_id='conn_id',
sql = sql_parse(script_path, 'sql/sql_upsert/{}.sql'.format(tbl_name)),
trigger_rule=TriggerRule.ONE_SUCCESS,
dag = dag
)
# Set dependencies
exists >> create >> upsert
exists >> dummy >> upsert
存储对在每个循环结束时添加的最后一个任务的引用。
然后,在每个循环的开始,检查 ref 是否存在。
如果 ref 存在,则将其设置为上游。
像这样:
last_task = None
for tbl_name in list_of_table_names:
# run has_table python function
exists = BranchPythonOperator(
task_id='tbl_exists_{}'.format(tbl_name),
python_callable=has_table,
depends_on_past=False,
dag=dag
)
if last_task:
last_task >> exists
...
# Run insert or truncate/replace SQL script
upsert = JdbcOperator(
task_id='tbl_upsert_{}'.format(tbl_name),
jdbc_conn_id='conn_id',
sql = sql_parse(script_path, 'sql/sql_upsert/{}.sql'.format(tbl_name)),
trigger_rule=TriggerRule.ONE_SUCCESS,
dag = dag
)
last_task = upsert
...
我正在使用 Airflow 运行 for 循环中的一组任务。循环的目的是遍历数据库 table 名称列表并执行以下操作:
for table_name in list_of_tables:
if table exists in database (BranchPythonOperator)
do nothing (DummyOperator)
else:
create table (JdbcOperator)
insert records into table (JdbcOperator, Trigger on One Success)
在网络上 UI,这看起来像:
目前,Airflow 从上到下然后从左到右执行此图像中的任务,例如:tbl_exists_fake_table_one
--> tbl_exists_fake_table_two
--> tbl_create_fake_table_one
,等等
但是,fake_table_two
的 insert
语句依赖于 fake_table_one
的更新,Airflow 当前未捕获到依赖项。 (从技术上讲,这种依赖是通过list_of_table_names
的顺序捕获的,但我相信在更复杂的情况下这会容易出错)
我想要与 fake_table_one
相关的所有任务到 运行,然后是与 fake_table_two
相关的所有任务。我怎样才能在气流中完成这个?
完整代码如下:
for tbl_name in list_of_table_names:
# Check if table exists by querying information tables
def has_table(tbl_name=tbl_name):
p = JdbcHook('conn_id')
sql =""" select count(*) from system.tables where name = '{}' """.format(tbl_name.upper())
count = p.get_records(sql)[0][0] #unpack the list/tuple
# If the query didn't return rows, branch to Create Table Task
# otherwise, branch to Dummy Operator (Airflow requires that both branches have a task)
if count == 0:
return 'tbl_create_{}'.format(tbl_name)
else:
return 'dummy_{}'.format(tbl_name)
# run has_table python function
exists = BranchPythonOperator(
task_id='tbl_exists_{}'.format(tbl_name),
python_callable=has_table,
depends_on_past=False,
dag=dag
)
# Dummy Operator
dummy = DummyOperator(task_id='dummy_{}'.format(tbl_name),dag=dag,depends_on_past=False)
# Run create table SQL script
create = JdbcOperator(
task_id='tbl_create_{}'.format(tbl_name),
jdbc_conn_id='conn_id',
sql = sql_parse(script_path, 'sql/sql_create/{}.sql'.format(tbl_name)),
depends_on_past=False,
dag = dag
)
# Run insert or truncate/replace SQL script
upsert = JdbcOperator(
task_id='tbl_upsert_{}'.format(tbl_name),
jdbc_conn_id='conn_id',
sql = sql_parse(script_path, 'sql/sql_upsert/{}.sql'.format(tbl_name)),
trigger_rule=TriggerRule.ONE_SUCCESS,
dag = dag
)
# Set dependencies
exists >> create >> upsert
exists >> dummy >> upsert
存储对在每个循环结束时添加的最后一个任务的引用。 然后,在每个循环的开始,检查 ref 是否存在。 如果 ref 存在,则将其设置为上游。
像这样:
last_task = None
for tbl_name in list_of_table_names:
# run has_table python function
exists = BranchPythonOperator(
task_id='tbl_exists_{}'.format(tbl_name),
python_callable=has_table,
depends_on_past=False,
dag=dag
)
if last_task:
last_task >> exists
...
# Run insert or truncate/replace SQL script
upsert = JdbcOperator(
task_id='tbl_upsert_{}'.format(tbl_name),
jdbc_conn_id='conn_id',
sql = sql_parse(script_path, 'sql/sql_upsert/{}.sql'.format(tbl_name)),
trigger_rule=TriggerRule.ONE_SUCCESS,
dag = dag
)
last_task = upsert
...